<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->

<script id="security-template" type="text/x-handlebars-template">
    <h3 class="anchor-heading"><a id="security_overview" class="anchor-link"></a><a href="#security_overview">7.1 Security Overview</a></h3>
    In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster. The following security measures are currently supported:
    <ol>
        <li>Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL. Kafka supports the following SASL mechanisms:
            <ul>
                <li>SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0</li>
                <li>SASL/PLAIN - starting at version 0.10.0.0</li>
                <li>SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0</li>
                <li>SASL/OAUTHBEARER - starting at version 2.0</li>
            </ul></li>
        <li>Authentication of connections from brokers to ZooKeeper</li>
        <li>Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)</li>
        <li>Authorization of read / write operations by clients</li>
        <li>Authorization is pluggable and integration with external authorization services is supported</li>
    </ol>

    It's worth noting that security is optional - non-secured clusters are supported, as well as a mix of authenticated, unauthenticated, encrypted and non-encrypted clients.

    The guides below explain how to configure and use the security features in both clients and brokers.

    <h3 class="anchor-heading"><a id="listener_configuration" class="anchor-link"></a><a href="#listener_configuration">7.2 Listener Configuration</a></h3>

    <p>In order to secure a Kafka cluster, it is necessary to secure the channels that are used to
      communicate with the servers. Each server must define the set of listeners that are used to
      receive requests from clients as well as other servers. Each listener may be configured
      to authenticate clients using various mechanisms and to ensure traffic between the
      server and the client is encrypted. This section provides a primer for the configuration
      of listeners.</p>

    <p>Kafka servers support listening for connections on multiple ports. This is configured through
      the <code>listeners</code> property in the server configuration, which accepts a comma-separated
      list of the listeners to enable. At least one listener must be defined on each server. The format
      of each listener defined in <code>listeners</code> is given below:</p>
	  
    <pre><code class="language-text">{LISTENER_NAME}://{hostname}:{port}</code></pre>
	    
    <p>The <code>LISTENER_NAME</code> is usually a descriptive name which defines the purpose of
      the listener. For example, many configurations use a separate listener for client traffic,
      so they might refer to the corresponding listener as <code>CLIENT</code> in the configuration:</p>
      
    <pre><code class="language-text">listeners=CLIENT://localhost:9092</code></pre>
      
    <p>The security protocol of each listener is defined in a separate configuration:
      <code>listener.security.protocol.map</code>. The value is a comma-separated list
      of each listener mapped to its security protocol. For example, the follow value
      configuration specifies that the <code>CLIENT</code> listener will use SSL while the
      <code>BROKER</code> listener will use plaintext.</p>
    
    <pre><code class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
	    
    <p>Possible options (case-insensitive) for the security protocol are given below:</p>
    <ol>
      <li>PLAINTEXT</li>
      <li>SSL</li>
      <li>SASL_PLAINTEXT</li>
      <li>SASL_SSL</li>
    </ol>

    <p>The plaintext protocol provides no security and does not require any additional configuration.
      In the following sections, this document covers how to configure the remaining protocols.</p>

    <p>If each required listener uses a separate security protocol, it is also possible to use the
      security protocol name as the listener name in <code>listeners</code>. Using the example above,
      we could skip the definition of the <code>CLIENT</code> and <code>BROKER</code> listeners
      using the following definition:</p>
    
    <pre><code class="language-text">listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093</code></pre>
      
    <p>However, we recommend users to provide explicit names for the listeners since it
      makes the intended usage of each listener clearer.</p>

    <p>Among the listeners in this list, it is possible to declare the listener to be used for
      inter-broker communication by setting the <code>inter.broker.listener.name</code> configuration
      to the name of the listener. The primary purpose of the inter-broker listener is
      partition replication. If not defined, then the inter-broker listener is determined
      by the security protocol defined by <code>security.inter.broker.protocol</code>, which
      defaults to <code>PLAINTEXT</code>.</p>
    
    <p>For legacy clusters which rely on Zookeeper to store cluster metadata, it is possible to
      declare a separate listener to be used for metadata propagation from the active controller
      to the brokers. This is defined by <code>control.plane.listener.name</code>. The active controller
      will use this listener when it needs to push metadata updates to the brokers in the cluster.
      The benefit of using a control plane listener is that it uses a separate processing thread,
      which makes it less likely for application traffic to impede timely propagation of metadata changes
      (such as partition leader and ISR updates). Note that the default value is null, which
      means that the controller will use the same listener defined by <code>inter.broker.listener</code></p>
    
    <p>In a KRaft cluster, a broker is any server which has the <code>broker</code> role enabled
      in <code>process.roles</code> and a controller is any server which has the <code>controller</code>
      role enabled. Listener configuration depends on the role. The listener defined by
      <code>inter.broker.listener.name</code> is used exclusively for requests between brokers.
      Controllers, on the other hand, must use separate listener which is defined by the
      <code>controller.listener.names</code> configuration. This cannot be set to the same
      value as the inter-broker listener.</p>

    <p>Controllers receive requests both from other controllers and from brokers. For
      this reason, even if a server does not have the <code>controller</code> role enabled
      (i.e. it is just a broker), it must still define the controller listener along with
      any security properties that are needed to configure it. For example, we might
      use the following configuration on a standalone broker:</p>
      
    <pre><code class="language-text">process.roles=broker
listeners=BROKER://localhost:9092
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>

    <p>The controller listener is still configured in this example to use the <code>SASL_SSL</code>
      security protocol, but it is not included in <code>listeners</code> since the broker
      does not expose the controller listener itself. The port that will be used in this case
      comes from the <code>controller.quorum.voters</code> configuration, which defines
      the complete list of controllers.</p>

    <p>For KRaft servers which have both the broker and controller role enabled, the configuration
      is similar. The only difference is that the controller listener must be included in
      <code>listeners</code>:</p>
    
    <pre><code class="language-text">process.roles=broker,controller
listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
inter.broker.listener.name=BROKER
controller.quorum.voters=0@localhost:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL</code></pre>

    <p>It is a requirement for the port defined in <code>controller.quorum.voters</code> to
      exactly match one of the exposed controller listeners. For example, here the
      <code>CONTROLLER</code> listener is bound to port 9093. The connection string
      defined by <code>controller.quorum.voters</code> must then also use port 9093,
      as it does here.</p>

    <p>The controller will accept requests on all listeners defined by <code>controller.listener.names</code>.
      Typically there would be just one controller listener, but it is possible to have more.
      For example, this provides a way to change the active listener from one port or security
      protocol to another through a roll of the cluster (one roll to expose the new listener,
      and one roll to remove the old listener). When multiple controller listeners are defined,
      the first one in the list will be used for outbound requests.</p>

    <p>It is conventional in Kafka to use a separate listener for clients. This allows the
      inter-cluster listeners to be isolated at the network level. In the case of the controller
      listener in KRaft, the listener should be isolated since clients do not work with it
      anyway. Clients are expected to connect to any other listener configured on a broker.
      Any requests that are bound for the controller will be forwarded as described
      <a href="#kraft_principal_forwarding">below</a></p>
    
    <p>In the following <a href="#security_ssl">section</a>, this document covers how to enable SSL
      on a listener for encryption as well as authentication. The subsequent <a href="#security_sasl">section</a> will then
      cover additional authentication mechanisms using SASL.</p>
    
    <h3 class="anchor-heading"><a id="security_ssl" class="anchor-link"></a><a href="#security_ssl">7.3 Encryption and Authentication using SSL</a></h3>
    Apache Kafka allows clients to use SSL for encryption of traffic as well as authentication. By default, SSL is disabled but can be turned on if needed.
    The following paragraphs explain in detail how to set up your own PKI infrastructure, use it to create certificates and configure Kafka to use these.

    <ol>
        <li><h4 class="anchor-heading"><a id="security_ssl_key" class="anchor-link"></a><a href="#security_ssl_key">Generate SSL key and certificate for each Kafka broker</a></h4>
            The first step of deploying one or more brokers with SSL support is to generate a public/private keypair for every server.
            Since Kafka expects all keys and certificates to be stored in keystores we will use Java's keytool command for this task.
            The tool supports two different keystore formats, the Java specific jks format which has been deprecated by now, as well as PKCS12.
            PKCS12 is the default format as of Java version 9, to ensure this format is being used regardless of the Java version in use all following
            commands explicitly specify the PKCS12 format.
            <pre><code class="language-bash">$ keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12</code></pre>
            You need to specify two parameters in the above command:
            <ol>
                <li>keystorefile: the keystore file that stores the keys (and later the certificate) for this broker. The keystore file contains the private
                    and public keys of this broker, therefore it needs to be kept safe. Ideally this step is run on the Kafka broker that the key will be
                    used on, as this key should never be transmitted/leave the server that it is intended for.</li>
                <li>validity: the valid time of the key in days. Please note that this differs from the validity period for the certificate, which
                    will be determined in <a href ="#security_ssl_signing">Signing the certificate</a>. You can use the same key to request multiple
                    certificates: if your key has a validity of 10 years, but your CA will only sign certificates that are valid for one year, you
                    can use the same key with 10 certificates over time.</li>
            </ol><br>
            To obtain a certificate that can be used with the private key that was just created a certificate signing request needs to be created. This
            signing request, when signed by a trusted CA results in the actual certificate which can then be installed in the keystore and used for
            authentication purposes.<br>
            To generate certificate signing requests run the following command for all server keystores created so far.

            <pre><code class="language-bash">$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
            This command assumes that you want to add hostname information to the certificate, if this is not the case, you can omit the extension parameter <code>-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code>. Please see below for more information on this.

            <h5>Host Name Verification</h5>
            Host name verification, when enabled, is the process of checking attributes from the certificate that is presented by the server you are
            connecting to against the actual hostname or ip address of that server to ensure that you are indeed connecting to the correct server.<br>
            The main reason for this check is to prevent man-in-the-middle attacks.

            For Kafka, this check has been disabled by default for a long time, but as of Kafka 2.0.0 host name verification of servers is enabled by default
            for client connections as well as inter-broker connections.<br>
            Server host name verification may be disabled by setting <code>ssl.endpoint.identification.algorithm</code> to an empty string.<br>
            For dynamically configured broker listeners, hostname verification may be disabled using <code>kafka-configs.sh</code>:<br>

            <pre><code class="language-bash">$ bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="</code></pre>

            <p><b>Note:</b></p>
            Normally there is no good reason to disable hostname verification apart from being the quickest way to "just get it to work" followed
            by the promise to "fix it later when there is more time"!<br>
            Getting hostname verification right is not that hard when done at the right time, but gets much harder once the cluster is up and
            running - do yourself a favor and do it now!


            <p>If host name verification is enabled, clients will verify the server's fully qualified domain name (FQDN) or ip address against one of the following two fields:
            <ol>
                <li>Common Name (CN)</li>
                <li><a href="https://tools.ietf.org/html/rfc5280#section-4.2.1.6">Subject Alternative Name (SAN)</a></li>
            </ol><br>
            While Kafka checks both fields, usage of the common name field for hostname verification has been
            <a href="https://tools.ietf.org/html/rfc2818#section-3.1">deprecated</a> since 2000 and should be avoided if possible. In addition the
            SAN field is much more flexible, allowing for multiple DNS and IP entries to be declared in a certificate.<br>
            Another advantage is that if the SAN field is used for hostname verification the common name can be set to a more meaningful value for
            authorization purposes. Since we need the SAN field to be contained in the signed certificate, it will be specified when generating the
            signing request. It can also be specified when generating the keypair, but this will not automatically be copied into the signing request.<br>


            To add a SAN field append the following argument <code> -ext SAN=DNS:{FQDN},IP:{IPADDRESS}</code> to the keytool command:
            <pre><code class="language-bash">$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}</code></pre>
        </li>

        <li><h4 class="anchor-heading"><a id="security_ssl_ca" class="anchor-link"></a><a href="#security_ssl_ca">Creating your own CA</a></h4>
            After this step each machine in the cluster has a public/private key pair which can already be used to encrypt traffic and a certificate
            signing request, which is the basis for creating a certificate. To add authentication capabilities this signing request needs to be signed
            by a trusted authority, which will be created in this step.

            <p>A certificate authority (CA) is responsible for signing certificates. CAs works likes a government that issues passports - the government
            stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is
            authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to
            forge. Thus, as long as the CA is a genuine and trusted authority, the clients have a strong assurance that they are connecting to the authentic
            machines.

            <p>For this guide we will be our own Certificate Authority. When setting up a production cluster in a corporate environment these certificates would
            usually be signed by a corporate CA that is trusted throughout the company. Please see <a href="#security_ssl_production">Common Pitfalls in
            Production</a> for some things to consider for this case.

            <p>Due to a <a href="https://www.openssl.org/docs/man1.1.1/man1/x509.html#BUGS">bug</a> in OpenSSL, the x509 module will not copy requested
            extension fields from CSRs into the final certificate. Since we want the SAN extension to be present in our certificate to enable hostname
            verification, we'll use the <i>ca</i> module instead. This requires some additional configuration to be in place before we generate our
            CA keypair.<br>

            Save the following listing into a file called openssl-ca.cnf and adjust the values for validity and common attributes as necessary.
            <pre><code class="language-bash">HOME            = .
RANDFILE        = $ENV::HOME/.rnd

####################################################################
[ ca ]
default_ca    = CA_default      # The default ca section

[ CA_default ]

base_dir      = .
certificate   = $base_dir/cacert.pem   # The CA certificate
private_key   = $base_dir/cakey.pem    # The CA private key
new_certs_dir = $base_dir              # Location for new certs after signing
database      = $base_dir/index.txt    # Database index file
serial        = $base_dir/serial.txt   # The current serial number

default_days     = 1000         # How long to certify for
default_crl_days = 30           # How long before next CRL
default_md       = sha256       # Use public key default MD
preserve         = no           # Keep passed DN ordering

x509_extensions = ca_extensions # The extensions to add to the cert

email_in_dn     = no            # Don't concat the email in the DN
copy_extensions = copy          # Required to copy SANs from CSR to cert

####################################################################
[ req ]
default_bits       = 4096
default_keyfile    = cakey.pem
distinguished_name = ca_distinguished_name
x509_extensions    = ca_extensions
string_mask        = utf8only

####################################################################
[ ca_distinguished_name ]
countryName         = Country Name (2 letter code)
countryName_default = DE

stateOrProvinceName         = State or Province Name (full name)
stateOrProvinceName_default = Test Province

localityName                = Locality Name (eg, city)
localityName_default        = Test Town

organizationName            = Organization Name (eg, company)
organizationName_default    = Test Company

organizationalUnitName         = Organizational Unit (eg, division)
organizationalUnitName_default = Test Unit

commonName         = Common Name (e.g. server FQDN or YOUR name)
commonName_default = Test Name

emailAddress         = Email Address
emailAddress_default = test@test.com

####################################################################
[ ca_extensions ]

subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid:always, issuer
basicConstraints       = critical, CA:true
keyUsage               = keyCertSign, cRLSign

####################################################################
[ signing_policy ]
countryName            = optional
stateOrProvinceName    = optional
localityName           = optional
organizationName       = optional
organizationalUnitName = optional
commonName             = supplied
emailAddress           = optional

####################################################################
[ signing_req ]
subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints       = CA:FALSE
keyUsage               = digitalSignature, keyEncipherment</code></pre>

            Then create a database and serial number file, these will be used to keep track of which certificates were signed with this CA. Both of
            these are simply text files that reside in the same directory as your CA keys.

            <pre><code class="language-bash">$ echo 01 &gt; serial.txt
$ touch index.txt</code></pre>

            With these steps done you are now ready to generate your CA that will be used to sign certificates later.

            <pre><code class="language-bash">$ openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM</code></pre>

            The CA is simply a public/private key pair and certificate that is signed by itself, and is only intended to sign other certificates.<br>
            This keypair should be kept very safe, if someone gains access to it, they can create and sign certificates that will be trusted by your
            infrastructure, which means they will be able to impersonate anybody when connecting to any service that trusts this CA.<br>

            The next step is to add the generated CA to the **clients' truststore** so that the clients can trust this CA:
            <pre><code class="language-bash">$ keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert</code></pre>

            <b>Note:</b>
            If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" in the
            <a href="#brokerconfigs">Kafka brokers config</a> then you must provide a truststore for the Kafka brokers as well and it should have
            all the CA certificates that clients' keys were signed by.
            <pre><code class="language-bash">$ keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert</code></pre>

            In contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates
            that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that
            certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This
            attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates
            in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all
            other machines.
        </li>
        <li><h4 class="anchor-heading"><a id="security_ssl_signing" class="anchor-link"></a><a href="#security_ssl_signing">Signing the certificate</a></h4>
            Then sign it with the CA:
            <pre><code class="language-bash">$ openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out {server certificate} -infiles {certificate signing request}</code></pre>

            Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
            <pre><code class="language-bash">$ keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
$ keytool -keystore {keystore} -alias localhost -import -file cert-signed</code></pre>

            The definitions of the parameters are the following:
            <ol>
                <li>keystore: the location of the keystore</li>
                <li>CA certificate: the certificate of the CA</li>
                <li>certificate signing request: the csr created with the server key</li>
                <li>server certificate: the file to write the signed certificate of the server to</li>
            </ol>

            This will leave you with one truststore called <i>truststore.jks</i> - this can be the same for all clients and brokers and does not
            contain any sensitive information, so there is no need to secure this.<br>
            Additionally you will have one <i>server.keystore.jks</i> file per node which contains that nodes keys, certificate and your CAs certificate,
            please refer to <a href="#security_configbroker">Configuring Kafka Brokers</a> and <a href="#security_configclients">Configuring Kafka Clients</a>
            for information on how to use these files.

            <p>For some tooling assistance on this topic, please check out the <a href="https://github.com/OpenVPN/easy-rsa">easyRSA</a> project which has
                extensive scripting in place to help with these steps.</p>

            <h5>SSL key and certificates in PEM format</h5>
            From 2.7.0 onwards, SSL key and trust stores can be configured for Kafka brokers and clients directly in the configuration in PEM format.
            This avoids the need to store separate files on the file system and benefits from password protection features of Kafka configuration.
            PEM may also be used as the store type for file-based key and trust stores in addition to JKS and PKCS12. To configure PEM key store directly in the
            broker or client configuration, private key in PEM format should be provided in <code>ssl.keystore.key</code> and the certificate chain in PEM format
            should be provided in <code>ssl.keystore.certificate.chain</code>.  To configure trust store, trust certificates, e.g. public certificate of CA,
            should be provided in <code>ssl.truststore.certificates</code>. Since PEM is typically stored as multi-line base-64 strings, the configuration value
            can be included in Kafka configuration as multi-line strings with lines terminating in backslash ('\') for line continuation.

            <p>Store password configs <code>ssl.keystore.password</code> and <code>ssl.truststore.password</code> are not used for PEM.
            If private key is encrypted using a password, the key password must be provided in <code>ssl.key.password</code>. Private keys may be provided
            in unencrypted form without a password. In production deployments, configs should be encrypted or
            externalized using password protection feature in Kafka in this case. Note that the default SSL engine factory has limited capabilities for decryption
            of encrypted private keys when external tools like OpenSSL are used for encryption. Third party libraries like BouncyCastle may be integrated with a
            custom <code>SslEngineFactory</code> to support a wider range of encrypted private keys.</p>

        </li>
        <li><h4 class="anchor-heading"><a id="security_ssl_production" class="anchor-link"></a><a href="#security_ssl_production">Common Pitfalls in Production</a></h4>
            The above paragraphs show the process to create your own CA and use it to sign certificates for your cluster.
            While very useful for sandbox, dev, test, and similar systems, this is usually not the correct process to create certificates for a production
            cluster in a corporate environment.
            Enterprises will normally operate their own CA and users can send in CSRs to be signed with this CA, which has the benefit of users not being
            responsible to keep the CA secure as well as a central authority that everybody can trust.
            However it also takes away a lot of control over the process of signing certificates from the user. Quite often the persons operating corporate
            CAs will apply tight restrictions on certificates that can cause issues when trying to use these certificates with Kafka.

            <ol>
                <li><b><a href="https://tools.ietf.org/html/rfc5280#section-4.2.1.12">Extended Key Usage</a></b><br>Certificates may contain an extension
                    field that controls the purpose for which the certificate can be used. If this field is empty, there are no restrictions on the usage,
                    but if any usage is specified in here, valid SSL implementations have to enforce these usages.<br>
                    Relevant usages for Kafka are:
                    <ul>
                        <li>Client authentication</li>
                        <li>Server authentication</li>
                    </ul>
                    Kafka brokers need both these usages to be allowed, as for intra-cluster communication every broker will behave as both the client and
                    the server towards other brokers. It is not uncommon for corporate CAs to have a signing profile for webservers and use this for Kafka as
                    well, which will only contain the <i>serverAuth</i> usage value and cause the SSL handshake to fail.
                </li>
                <li><b>Intermediate Certificates</b><br>
                    Corporate Root CAs are often kept offline for security reasons. To enable day-to-day usage, so called intermediate CAs are created, which
                    are then used to sign the final certificates. When importing a certificate into the keystore that was signed by an intermediate CA it is
                    necessary to provide the entire chain of trust up to the root CA. This can be done by simply <i>cat</i>ing the certificate files into one
                    combined certificate file and then importing this with keytool.
                </li>
                <li><b>Failure to copy extension fields</b><br>
                    CA operators are often hesitant to copy and requested extension fields from CSRs and prefer to specify these themselves as this makes it
                    harder for a malicious party to obtain certificates with potentially misleading or fraudulent values.
                    It is advisable to double check signed certificates, whether these contain all requested SAN fields to enable proper hostname verification.
                    The following command can be used to print certificate details to the console, which should be compared with what was originally requested:
                    <pre><code class="language-bash">$ openssl x509 -in certificate.crt -text -noout</code></pre>
                </li>
            </ol>
        </li>

        <li><h4 class="anchor-heading"><a id="security_configbroker" class="anchor-link"></a><a href="#security_configbroker">Configuring Kafka Brokers</a></h4>

            If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
            <pre><code class="language-text">listeners=PLAINTEXT://host.name:port,SSL://host.name:port</code></pre>

            Following SSL configs are needed on the broker side
            <pre><code class="language-text">ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234</code></pre>

            Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.

            Optional settings that are worth considering:
            <ol>
                <li>ssl.client.auth=none ("required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect. The usage of "requested" is discouraged as it provides a false sense of security and misconfigured clients will still connect successfully.)</li>
                <li>ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. (Default is an empty list)</li>
                <li>ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note that SSL is deprecated in favor of TLS and using SSL in production is not recommended)</li>
                <li>ssl.keystore.type=JKS</li>
                <li>ssl.truststore.type=JKS</li>
                <li>ssl.secure.random.implementation=SHA1PRNG</li>
            </ol>
            If you want to enable SSL for inter-broker communication, add the following to the server.properties file (it defaults to PLAINTEXT)
            <pre><code class="language-text">security.inter.broker.protocol=SSL</code></pre>

            <p>
            Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the <a href="https://www.oracle.com/technetwork/java/javase/downloads/index.html">JCE Unlimited Strength Jurisdiction Policy Files</a> must be obtained and installed in the JDK/JRE. See the
            <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/SunProviders.html">JCA Providers Documentation</a> for more information.
            </p>

            <p>
            The JRE/JDK will have a default pseudo-random number generator (PRNG) that is used for cryptography operations, so it is not required to configure the
            implementation used with the <code>ssl.secure.random.implementation</code>. However, there are performance issues with some implementations (notably, the
            default chosen on Linux systems, <code>NativePRNG</code>, utilizes a global lock). In cases where performance of SSL connections becomes an issue,
            consider explicitly setting the implementation to be used. The <code>SHA1PRNG</code> implementation is non-blocking, and has shown very good performance
            characteristics under heavy load (50 MB/sec of produced messages, plus replication traffic, per-broker).
            </p>

            Once you start the broker you should be able to see in the server.log
            <pre><code class="language-text">with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</code></pre>

            To check quickly if  the server keystore and truststore are setup properly you can run the following command
            <pre><code class="language-bash">$ openssl s_client -debug -connect localhost:9093 -tls1</code></pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
            In the output of this command you should see server's certificate:
            <pre><code class="language-text">-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com</code></pre>
            If the certificate does not show up or if there are any other error messages then your keystore is not setup properly.</li>

        <li><h4 class="anchor-heading"><a id="security_configclients" class="anchor-link"></a><a href="#security_configclients">Configuring Kafka Clients</a></h4>
            SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported. The configs for SSL will be the same for both producer and consumer.<br>
            If client authentication is not required in the broker, then the following is a minimal configuration example:
            <pre><code class="language-text">security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234</code></pre>

            Note: ssl.truststore.password is technically optional but highly recommended. If a password is not set access to the truststore is still available, but integrity checking is disabled.

            If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
            <pre><code class="language-text">ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234</code></pre>

            Other configuration settings that may also be needed depending on our requirements and the broker configuration:
            <ol>
                <li>ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</li>
                <li>ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.</li>
                <li>ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should list at least one of the protocols configured on the broker side</li>
                <li>ssl.truststore.type=JKS</li>
                <li>ssl.keystore.type=JKS</li>
            </ol>
            <br>
            Examples using console-producer and console-consumer:
            <pre><code class="language-bash">$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</code></pre>
        </li>
    </ol>
    <h3 class="anchor-heading"><a id="security_sasl" class="anchor-link"></a><a href="#security_sasl">7.4 Authentication using SASL</a></h3>

    <ol>
        <li><h4 class="anchor-heading"><a id="security_sasl_jaasconfig" class="anchor-link"></a><a href="#security_sasl_jaasconfig">JAAS configuration</a></h4>
            <p>Kafka uses the Java Authentication and Authorization Service
            (<a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html">JAAS</a>)
            for SASL configuration.</p>
                <ol>
                <li><h5><a id="security_jaas_broker"
                    href="#security_jaas_broker">JAAS configuration for Kafka brokers</a></h5>

                    <p><code>KafkaServer</code> is the section name in the JAAS file used by each
                    KafkaServer/Broker. This section provides SASL configuration options
                    for the broker including any SASL client connections made by the broker
                    for inter-broker communication. If multiple listeners are configured to use
                    SASL, the section name may be prefixed with the listener name in lower-case
                    followed by a period, e.g. <code>sasl_ssl.KafkaServer</code>.</p>

                    <p><code>Client</code> section is used to authenticate a SASL connection with
                    zookeeper. It also allows the brokers to set SASL ACL on zookeeper
                    nodes which locks these nodes down so that only the brokers can
                    modify it. It is necessary to have the same principal name across all
                    brokers. If you want to use a section name other than Client, set the
                    system property <code>zookeeper.sasl.clientconfig</code> to the appropriate
                    name (<i>e.g.</i>, <code>-Dzookeeper.sasl.clientconfig=ZkClient</code>).</p>

                    <p>ZooKeeper uses "zookeeper" as the service name by default. If you
                    want to change this, set the system property
                    <code>zookeeper.sasl.client.username</code> to the appropriate name
                    (<i>e.g.</i>, <code>-Dzookeeper.sasl.client.username=zk</code>).</p>

                    <p>Brokers may also configure JAAS using the broker configuration property <code>sasl.jaas.config</code>.
                    The property name must be prefixed with the listener prefix including the SASL mechanism,
                    i.e. <code>listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config</code>. Only one
                    login module may be specified in the config value. If multiple mechanisms are configured on a
                    listener, configs must be provided for each mechanism using the listener and mechanism prefix.
                    For example,</p>
                    <pre><code class="language-text">listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" \
    password="admin-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="admin" \
    password="admin-secret" \
    user_admin="admin-secret" \
    user_alice="alice-secret";</code></pre>

                    If JAAS configuration is defined at different levels, the order of precedence used is:
                    <ul>
                        <li>Broker configuration property <code>listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config</code></li>
                        <li><code>{listenerName}.KafkaServer</code> section of static JAAS configuration</li>
                        <li><code>KafkaServer</code> section of static JAAS configuration</li>
                    </ul>
                    Note that ZooKeeper JAAS config may only be configured using static JAAS configuration.

                    <p>See <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
                        <a href="#security_sasl_plain_brokerconfig">PLAIN</a>,
                        <a href="#security_sasl_scram_brokerconfig">SCRAM</a> or
                        <a href="#security_sasl_oauthbearer_brokerconfig">OAUTHBEARER</a> for example broker configurations.</p></li>

                <li><h5><a id="security_jaas_client"
                           href="#security_jaas_client">JAAS configuration for Kafka clients</a></h5>

                    <p>Clients may configure JAAS using the client configuration property
                        <a href="#security_client_dynamicjaas">sasl.jaas.config</a>
                        or using the <a href="#security_client_staticjaas">static JAAS config file</a>
                        similar to brokers.</p>

                    <ol>
                        <li><h6><a id="security_client_dynamicjaas"
                                   href="#security_client_dynamicjaas">JAAS configuration using client configuration property</a></h6>
                            <p>Clients may specify JAAS configuration as a producer or consumer property without
                                creating a physical configuration file. This mode also enables different producers
                                and consumers within the same JVM to use different credentials by specifying
                                different properties for each client. If both static JAAS configuration system property
                                <code>java.security.auth.login.config</code> and client property <code>sasl.jaas.config</code>
                                are specified, the client property will be used.</p>

                            <p>See <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>,
                                <a href="#security_sasl_plain_clientconfig">PLAIN</a>,
                                <a href="#security_sasl_scram_clientconfig">SCRAM</a> or
                                <a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a> for example configurations.</p></li>

                        <li><h6 class="anchor-heading"><a id="security_client_staticjaas" class="anchor-link"></a><a href="#security_client_staticjaas">JAAS configuration using static config file</a></h6>
                            To configure SASL authentication on the clients using static JAAS config file:
                            <ol>
                                <li>Add a JAAS config file with a client login section named <code>KafkaClient</code>. Configure
                                    a login module in <code>KafkaClient</code> for the selected mechanism as described in the examples
                                    for setting up <a href="#security_sasl_kerberos_clientconfig">GSSAPI (Kerberos)</a>,
                                    <a href="#security_sasl_plain_clientconfig">PLAIN</a>,
                                    <a href="#security_sasl_scram_clientconfig">SCRAM</a> or
                                    <a href="#security_sasl_oauthbearer_clientconfig">OAUTHBEARER</a>.
                                    For example, <a href="#security_sasl_gssapi_clientconfig">GSSAPI</a>
                                    credentials may be configured as:
                                    <pre><code class="language-text">KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="kafka-client-1@EXAMPLE.COM";
};</code></pre>
                                </li>
                                <li>Pass the JAAS config file location as JVM parameter to each client JVM. For example:
                                    <pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</code></pre></li>
                            </ol>
                        </li>
                    </ol>
                </li>
            </ol>
        </li>
        <li><h4><a id="security_sasl_config"
                   href="#security_sasl_config">SASL configuration</a></h4>

            <p>SASL may be used with PLAINTEXT or SSL as the transport layer using the
                security protocol SASL_PLAINTEXT or SASL_SSL respectively. If SASL_SSL is
                used, then <a href="#security_ssl">SSL must also be configured</a>.</p>

            <ol>
                <li><h5><a id="security_sasl_mechanism"
                           href="#security_sasl_mechanism">SASL mechanisms</a></h5>
                    Kafka supports the following SASL mechanisms:
                    <ul>
                        <li><a href="#security_sasl_kerberos">GSSAPI</a> (Kerberos)</li>
                        <li><a href="#security_sasl_plain">PLAIN</a></li>
                        <li><a href="#security_sasl_scram">SCRAM-SHA-256</a></li>
                        <li><a href="#security_sasl_scram">SCRAM-SHA-512</a></li>
                        <li><a href="#security_sasl_oauthbearer">OAUTHBEARER</a></li>
                    </ul>
                </li>
                <li><h5><a id="security_sasl_brokerconfig"
                           href="#security_sasl_brokerconfig">SASL configuration for Kafka brokers</a></h5>
                    <ol>
                        <li>Configure a SASL port in server.properties, by adding at least one of
                            SASL_PLAINTEXT or SASL_SSL to the <i>listeners</i> parameter, which
                            contains one or more comma-separated values:
                            <pre><code class="language-text">listeners=SASL_PLAINTEXT://host.name:port</code></pre>
                            If you are only configuring a SASL port (or if you want
                            the Kafka brokers to authenticate each other using SASL) then make sure
                            you set the same SASL protocol for inter-broker communication:
                            <pre><code class="language-text">security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)</code></pre></li>
                        <li>Select one or more  <a href="#security_sasl_mechanism">supported mechanisms</a>
                            to enable in the broker and follow the steps to configure SASL for the mechanism.
                            To enable multiple mechanisms in the broker, follow the steps
                            <a href="#security_sasl_multimechanism">here</a>.</li>
                    </ol>
                </li>
                <li><h5><a id="security_sasl_clientconfig"
                           href="#security_sasl_clientconfig">SASL configuration for Kafka clients</a></h5>
                    <p>SASL authentication is only supported for the new Java Kafka producer and
                        consumer, the older API is not supported.</p>

                    <p>To configure SASL authentication on the clients, select a SASL
                        <a href="#security_sasl_mechanism">mechanism</a> that is enabled in
                        the broker for client authentication and follow the steps to configure SASL
                        for the selected mechanism.</p>

                    <p>Note: When establishing connections to brokers via SASL, clients may perform a reverse 
                        DNS lookup of the broker address. Due to how the JRE implements reverse 
                        DNS lookups, clients may observe slow SASL handshakes if fully qualified domain 
                        names are not used, for both the client's <code>bootstrap.servers</code> and a broker's
                        <code><a href="#brokerconfigs_advertised.listeners">advertised.listeners</a></code>.</p></li>
            </ol>
        </li>
        <li><h4><a id="security_sasl_kerberos" href="#security_sasl_kerberos">Authentication using SASL/Kerberos</a></h4>
            <ol>
                <li><h5 class="anchor-heading"><a id="security_sasl_kerberos_prereq" class="anchor-link"></a><a href="#security_sasl_kerberos_prereq">Prerequisites</a></h5>
                    <ol>
                        <li><b>Kerberos</b><br>
                            If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (<a href="https://help.ubuntu.com/community/Kerberos">Ubuntu</a>, <a href="https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Managing_Smart_Cards/installing-kerberos.html">Redhat</a>). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security.</li>
                        <li><b>Create Kerberos Principals</b><br>
                            If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).<br>
                            If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
                            <pre><code class="language-bash">$ sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
$ sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"</code></pre></li>
                        <li><b>Make sure all hosts can be reachable using hostnames</b> - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.</li>
                    </ol>
                <li><h5 class="anchor-heading"><a id="security_sasl_kerberos_brokerconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_brokerconfig">Configuring Kafka Brokers</a></h5>
                    <ol>
                        <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
                            <pre><code class="language-text">KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};

// Zookeeper client authentication
Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};</code></pre>

                            <code>KafkaServer</code> section in the JAAS file tells the broker which principal to use and the location of the keytab where this principal is stored. It
                            allows the broker to login using the keytab specified in this section. See <a href="#security_jaas_broker">notes</a> for more details on Zookeeper SASL configuration.
                        </li>
                        <li>Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
                            <pre><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre>
                        </li>
                        <li>Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.</li>
                        <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
                            <pre><code class="language-text">listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI</code></pre>
                            We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is "kafka/kafka1.hostname.com@EXAMPLE.com", so:
                            <pre><code class="language-text">sasl.kerberos.service.name=kafka</code></pre>
                        </li>
                    </ol></li>
                <li><h5 class="anchor-heading"><a id="security_sasl_kerberos_clientconfig" class="anchor-link"></a><a href="#security_sasl_kerberos_clientconfig">Configuring Kafka Clients</a></h5>
                    To configure SASL authentication on the clients:
                    <ol>
                        <li>
                            Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their
                            own principal (usually with the same name as the user running the client), so obtain or create
                            these principals as needed. Then configure the JAAS configuration property for each client.
                            Different clients within a JVM may run as different users by specifying different principals.
                            The property <code>sasl.jaas.config</code> in producer.properties or consumer.properties describes
                            how clients like producer and consumer can connect to the Kafka Broker. The following is an example
                            configuration for a client using a keytab (recommended for long-running processes):
                            <pre><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true  \
    keyTab="/etc/security/keytabs/kafka_client.keytab" \
    principal="kafka-client-1@EXAMPLE.COM";</code></pre>

                            For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used
                            along with "useTicketCache=true" as in:
                            <pre><code class="language-text">sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
    useTicketCache=true;</code></pre>

                            JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
                            as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                            <code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</li>
                        <li>Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.</li>
                        <li>Optionally pass the krb5 file locations as JVM parameters to each client JVM (see <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html">here</a> for more details):
                            <pre><code class="language-bash">-Djava.security.krb5.conf=/etc/kafka/krb5.conf</code></pre></li>
                        <li>Configure the following properties in producer.properties or consumer.properties:
                            <pre><code class="language-text">security.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka</code></pre></li>
                    </ol>
                </li>
            </ol>
        </li>

        <li><h4><a id="security_sasl_plain" href="#security_sasl_plain">Authentication using SASL/PLAIN</a></h4>
            <p>SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication.
                Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described <a href="#security_sasl_plain_production">here</a>.</p>
            Under the default implementation of <code>principal.builder.class</code>, the username is used as the authenticated <code>Principal</code> for configuration of ACLs etc.
            <ol>
                <li><h5 class="anchor-heading"><a id="security_sasl_plain_brokerconfig" class="anchor-link"></a><a href="#security_sasl_plain_brokerconfig">Configuring Kafka Brokers</a></h5>
                    <ol>
                        <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
                            <pre><code class="language-text">KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};</code></pre>
                            This configuration defines two users (<i>admin</i> and <i>alice</i>). The properties <code>username</code> and <code>password</code>
                            in the <code>KafkaServer</code> section are used by the broker to initiate connections to other brokers. In this example,
                            <i>admin</i> is the user for inter-broker communication. The set of properties <code>user_<i>userName</i></code> defines
                            the passwords for all users that connect to the broker and the broker validates all client connections including
                            those from other brokers using these properties.</li>
                        <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
                            <pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
                        <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
                            <pre><code class="language-text">listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN</code></pre></li>
                    </ol>
                </li>

                <li><h5 class="anchor-heading"><a id="security_sasl_plain_clientconfig" class="anchor-link"></a><a href="#security_sasl_plain_clientconfig">Configuring Kafka Clients</a></h5>
                    To configure SASL authentication on the clients:
                    <ol>
                        <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                            The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
                            The following is an example configuration for a client for the PLAIN mechanism:
                            <pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";</code></pre>
                            <p>The options <code>username</code> and <code>password</code> are used by clients to configure
                                the user for client connections. In this example, clients connect to the broker as user <i>alice</i>.
                                Different clients within a JVM may connect as different users by specifying different user names
                                and passwords in <code>sasl.jaas.config</code>.</p>

                            <p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
                                as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                                <code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</p></li>
                        <li>Configure the following properties in producer.properties or consumer.properties:
                            <pre><code class="language-text">security.protocol=SASL_SSL
sasl.mechanism=PLAIN</code></pre></li>
                    </ol>
                </li>
                <li><h5><a id="security_sasl_plain_production" href="#security_sasl_plain_production">Use of SASL/PLAIN in production</a></h5>
                    <ul>
                        <li>SASL/PLAIN should be used only with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.</li>
                        <li>The default implementation of SASL/PLAIN in Kafka specifies usernames and passwords in the JAAS configuration file as shown
                            <a href="#security_sasl_plain_brokerconfig">here</a>. From Kafka version 2.0 onwards, you can avoid storing clear passwords on disk
                            by configuring your own callback handlers that obtain username and password from an external source using the configuration options
                            <code>sasl.server.callback.handler.class</code> and <code>sasl.client.callback.handler.class</code>.</li>
                        <li>In production systems, external authentication servers may implement password authentication. From Kafka version 2.0 onwards,
                            you can plug in your own callback handlers that use external authentication servers for password verification by configuring
                            <code>sasl.server.callback.handler.class</code>.</li>
                    </ul>
                </li>
            </ol>
        </li>

        <li><h4><a id="security_sasl_scram" href="#security_sasl_scram">Authentication using SASL/SCRAM</a></h4>
            <p>Salted Challenge Response Authentication Mechanism (SCRAM) is a family of SASL mechanisms that
                addresses the security concerns with traditional mechanisms that perform username/password authentication
                like PLAIN and DIGEST-MD5. The mechanism is defined in <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a>.
                Kafka supports <a href="https://tools.ietf.org/html/rfc7677">SCRAM-SHA-256</a> and SCRAM-SHA-512 which
                can be used with TLS to perform secure authentication. Under the default implementation of <code>principal.builder.class</code>, the username is used as the authenticated
                <code>Principal</code> for configuration of ACLs etc. The default SCRAM implementation in Kafka
                stores SCRAM credentials in Zookeeper and is suitable for use in Kafka installations where Zookeeper
                is on a private network. Refer to <a href="#security_sasl_scram_security">Security Considerations</a>
                for more details.</p>
            <ol>
                <li><h5 class="anchor-heading"><a id="security_sasl_scram_credentials" class="anchor-link"></a><a href="#security_sasl_scram_credentials">Creating SCRAM Credentials</a></h5>
                    <p>The SCRAM implementation in Kafka uses Zookeeper as credential store. Credentials can be created in
                        Zookeeper using <code>kafka-configs.sh</code>. For each SCRAM mechanism enabled, credentials must be created
                        by adding a config with the mechanism name. Credentials for inter-broker communication must be created
                        before Kafka brokers are started. Client credentials may be created and updated dynamically and updated
                        credentials will be used to authenticate new connections.</p>
                    <p>Create SCRAM credentials for user <i>alice</i> with password <i>alice-secret</i>:
                    <pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice</code></pre>
                    <p>The default iteration count of 4096 is used if iterations are not specified. A random salt is created
                        and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper.
                        See <a href="https://tools.ietf.org/html/rfc5802">RFC 5802</a> for details on SCRAM identity and the individual fields.
                    <p>The following examples also require a user <i>admin</i> for inter-broker communication which can be created using:
                    <pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin</code></pre>
                    <p>Existing credentials may be listed using the <i>--describe</i> option:
                    <pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice</code></pre>
                    <p>Credentials may be deleted for one or more SCRAM mechanisms using the <i>--alter --delete-config</i> option:
                    <pre><code class="language-bash">$ bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice</code></pre>
                </li>
                <li><h5 class="anchor-heading"><a id="security_sasl_scram_brokerconfig" class="anchor-link"></a><a href="#security_sasl_scram_brokerconfig">Configuring Kafka Brokers</a></h5>
                    <ol>
                        <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
                            <pre><code class="language-text">KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
};</code></pre>
                            The properties <code>username</code> and <code>password</code> in the <code>KafkaServer</code> section are used by
                            the broker to initiate connections to other brokers. In this example, <i>admin</i> is the user for
                            inter-broker communication.</li>
                        <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
                            <pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
                        <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
                            <pre><code class="language-text">listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
                    </ol>
                </li>

                <li><h5 class="anchor-heading"><a id="security_sasl_scram_clientconfig" class="anchor-link"></a><a href="#security_sasl_scram_clientconfig">Configuring Kafka Clients</a></h5>
                    To configure SASL authentication on the clients:
                    <ol>
                        <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                            The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
                            The following is an example configuration for a client for the SCRAM mechanisms:
                            <pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="alice" \
    password="alice-secret";</code></pre>

                            <p>The options <code>username</code> and <code>password</code> are used by clients to configure
                                the user for client connections. In this example, clients connect to the broker as user <i>alice</i>.
                                Different clients within a JVM may connect as different users by specifying different user names
                                and passwords in <code>sasl.jaas.config</code>.</p>

                            <p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
                                as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                                <code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</p></li>
                        <li>Configure the following properties in producer.properties or consumer.properties:
                            <pre><code class="language-text">security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)</code></pre></li>
                    </ol>
                </li>
                <li><h5><a id="security_sasl_scram_security" href="#security_sasl_scram_security">Security Considerations for SASL/SCRAM</a></h5>
                    <ul>
                        <li>The default implementation of SASL/SCRAM in Kafka stores SCRAM credentials in Zookeeper. This
                            is suitable for production use in installations where Zookeeper is secure and on a private network.</li>
                        <li>Kafka supports only the strong hash functions SHA-256 and SHA-512 with a minimum iteration count
                            of 4096. Strong hash functions combined with strong passwords and high iteration counts protect
                            against brute force attacks if Zookeeper security is compromised.</li>
                        <li>SCRAM should be used only with TLS-encryption to prevent interception of SCRAM exchanges. This
                            protects against dictionary or brute force attacks and against impersonation if Zookeeper is compromised.</li>
                        <li>From Kafka version 2.0 onwards, the default SASL/SCRAM credential store may be overridden using custom callback handlers
                            by configuring <code>sasl.server.callback.handler.class</code> in installations where Zookeeper is not secure.</li>
                        <li>For more details on security considerations, refer to
                            <a href="https://tools.ietf.org/html/rfc5802#section-9">RFC 5802</a>.
                    </ul>
                </li>
            </ol>
        </li>

        <li><h4><a id="security_sasl_oauthbearer" href="#security_sasl_oauthbearer">Authentication using SASL/OAUTHBEARER</a></h4>
            <p>The <a href="https://tools.ietf.org/html/rfc6749">OAuth 2 Authorization Framework</a> "enables a third-party application to obtain limited access to an HTTP service,
                either on behalf of a resource owner by orchestrating an approval interaction between the resource owner and the HTTP
                service, or by allowing the third-party application to obtain access on its own behalf."  The SASL OAUTHBEARER mechanism
                enables the use of the framework in a SASL (i.e. a non-HTTP) context; it is defined in <a href="https://tools.ietf.org/html/rfc7628">RFC 7628</a>.
                The default OAUTHBEARER implementation in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>
                and is only suitable for use in non-production Kafka installations. Refer to <a href="#security_sasl_oauthbearer_security">Security Considerations</a>
                for more details.</p>
            Under the default implementation of <code>principal.builder.class</code>, the principalName of OAuthBearerToken is used as the authenticated <code>Principal</code> for configuration of ACLs etc.
            <ol>
                <li><h5 class="anchor-heading"><a id="security_sasl_oauthbearer_brokerconfig" class="anchor-link"></a><a href="#security_sasl_oauthbearer_brokerconfig">Configuring Kafka Brokers</a></h5>
                    <ol>
                        <li>Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
                            <pre><code class="language-text">KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
    unsecuredLoginStringClaim_sub="admin";
};</code></pre>
                            The property <code>unsecuredLoginStringClaim_sub</code> in the <code>KafkaServer</code> section is used by
                            the broker when it initiates connections to other brokers. In this example, <i>admin</i> will appear in the
                            subject (<code>sub</code>) claim and will be the user for inter-broker communication.</li>
                        <li>Pass the JAAS config file location as JVM parameter to each Kafka broker:
                            <pre><code class="language-bash">-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf</code></pre></li>
                        <li>Configure SASL port and SASL mechanisms in server.properties as described <a href="#security_sasl_brokerconfig">here</a>. For example:
                            <pre><code class="language-text">listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER</code></pre></li>
                    </ol>
                </li>

                <li><h5 class="anchor-heading"><a id="security_sasl_oauthbearer_clientconfig" class="anchor-link"></a><a href="#security_sasl_oauthbearer_clientconfig">Configuring Kafka Clients</a></h5>
                    To configure SASL authentication on the clients:
                    <ol>
                        <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                            The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
                            The following is an example configuration for a client for the OAUTHBEARER mechanisms:
                            <pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    unsecuredLoginStringClaim_sub="alice";</code></pre>

                            <p>The option <code>unsecuredLoginStringClaim_sub</code> is used by clients to configure
                                the subject (<code>sub</code>) claim, which determines the user for client connections.
                                In this example, clients connect to the broker as user <i>alice</i>.
                                Different clients within a JVM may connect as different users by specifying different subject (<code>sub</code>)
                                claims in <code>sasl.jaas.config</code>.</p>

                            <p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
                                as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                                <code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</p></li>
                        <li>Configure the following properties in producer.properties or consumer.properties:
                            <pre><code class="language-text">security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism=OAUTHBEARER</code></pre></li>
                        <li>The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library.
                            Since it's an optional dependency, users have to configure it as a dependency via their build tool.</li>
                    </ol>
                </li>
                <li><h5><a id="security_sasl_oauthbearer_unsecured_retrieval" href="#security_sasl_oauthbearer_unsecured_retrieval">Unsecured Token Creation Options for SASL/OAUTHBEARER</a></h5>
                    <ul>
                        <li>The default implementation of SASL/OAUTHBEARER in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>.
                            While suitable only for non-production use, it does provide the flexibility to create arbitrary tokens in a DEV or TEST environment.</li>
                        <li>Here are the various supported JAAS module options on the client side (and on the broker side if OAUTHBEARER is the inter-broker protocol):
                            <table>
                                <tr>
                                    <th>JAAS Module Option for Unsecured Token Creation</th>
                                    <th>Documentation</th>
                                </tr>
                                <tr>
                                    <td><code>unsecuredLoginStringClaim_&lt;claimname&gt;="value"</code></td>
                                    <td>Creates a <code>String</code> claim with the given name and value. Any valid
                                        claim name can be specified except '<code>iat</code>' and '<code>exp</code>' (these are
                                        automatically generated).</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredLoginNumberClaim_&lt;claimname&gt;="value"</code></td>
                                    <td>Creates a <code>Number</code> claim with the given name and value. Any valid
                                        claim name can be specified except '<code>iat</code>' and '<code>exp</code>' (these are
                                        automatically generated).</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredLoginListClaim_&lt;claimname&gt;="value"</code></td>
                                    <td>Creates a <code>String List</code> claim with the given name and values parsed
                                        from the given value where the first character is taken as the delimiter. For
                                        example: <code>unsecuredLoginListClaim_fubar="|value1|value2"</code>. Any valid
                                        claim name can be specified except '<code>iat</code>' and '<code>exp</code>' (these are
                                        automatically generated).</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredLoginExtension_&lt;extensionname&gt;="value"</code></td>
                                    <td>Creates a <code>String</code> extension with the given name and value.
                                        For example: <code>unsecuredLoginExtension_traceId="123"</code>. A valid extension name
                                        is any sequence of lowercase or uppercase alphabet characters. In addition, the "auth" extension name is reserved.
                                        A valid extension value is any combination of characters with ASCII codes 1-127.
                                </tr>
                                <tr>
                                    <td><code>unsecuredLoginPrincipalClaimName</code></td>
                                    <td>Set to a custom claim name if you wish the name of the <code>String</code>
                                        claim holding the principal name to be something other than '<code>sub</code>'.</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredLoginLifetimeSeconds</code></td>
                                    <td>Set to an integer value if the token expiration is to be set to something
                                        other than the default value of 3600 seconds (which is 1 hour). The
                                        '<code>exp</code>' claim will be set to reflect the expiration time.</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredLoginScopeClaimName</code></td>
                                    <td>Set to a custom claim name if you wish the name of the <code>String</code> or
                                        <code>String List</code> claim holding any token scope to be something other than
                                        '<code>scope</code>'.</td>
                                </tr>
                            </table>
                        </li>
                    </ul>
                </li>
                <li><h5><a id="security_sasl_oauthbearer_unsecured_validation" href="#security_sasl_oauthbearer_unsecured_validation">Unsecured Token Validation Options for SASL/OAUTHBEARER</a></h5>
                    <ul>
                        <li>Here are the various supported JAAS module options on the broker side for <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Token</a> validation:
                            <table>
                                <tr>
                                    <th>JAAS Module Option for Unsecured Token Validation</th>
                                    <th>Documentation</th>
                                </tr>
                                <tr>
                                    <td><code>unsecuredValidatorPrincipalClaimName="value"</code></td>
                                    <td>Set to a non-empty value if you wish a particular <code>String</code> claim
                                        holding a principal name to be checked for existence; the default is to check
                                        for the existence of the '<code>sub</code>' claim.</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredValidatorScopeClaimName="value"</code></td>
                                    <td>Set to a custom claim name if you wish the name of the <code>String</code> or
                                        <code>String List</code> claim holding any token scope to be something other than
                                        '<code>scope</code>'.</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredValidatorRequiredScope="value"</code></td>
                                    <td>Set to a space-delimited list of scope values if you wish the
                                        <code>String/String List</code> claim holding the token scope to be checked to
                                        make sure it contains certain values.</td>
                                </tr>
                                <tr>
                                    <td><code>unsecuredValidatorAllowableClockSkewMs="value"</code></td>
                                    <td>Set to a positive integer value if you wish to allow up to some number of
                                        positive milliseconds of clock skew (the default is 0).</td>
                                </tr>
                            </table>
                        </li>
                        <li>The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments)
                            using custom login and SASL Server callback handlers.</li>
                        <li>For more details on security considerations, refer to <a href="https://tools.ietf.org/html/rfc6749#section-10">RFC 6749, Section 10</a>.</li>
                    </ul>
                </li>
                <li><h5><a id="security_sasl_oauthbearer_refresh" href="#security_sasl_oauthbearer_refresh">Token Refresh for SASL/OAUTHBEARER</a></h5>
                    Kafka periodically refreshes any token before it expires so that the client can continue to make
                    connections to brokers. The parameters that impact how the refresh algorithm
                    operates are specified as part of the producer/consumer/broker configuration
                    and are as follows. See the documentation for these properties elsewhere for
                    details.  The default values are usually reasonable, in which case these
                    configuration parameters would not need to be explicitly set.
                    <table>
                        <tr>
                            <th>Producer/Consumer/Broker Configuration Property</th>
                        </tr>
                        <tr>
                            <td><code>sasl.login.refresh.window.factor</code></td>
                        </tr>
                        <tr>
                            <td><code>sasl.login.refresh.window.jitter</code></td>
                        </tr>
                        <tr>
                            <td><code>sasl.login.refresh.min.period.seconds</code></td>
                        </tr>
                        <tr>
                            <td><code>sasl.login.refresh.min.buffer.seconds</code></td>
                        </tr>
                    </table>
                </li>
                <li><h5><a id="security_sasl_oauthbearer_prod" href="#security_sasl_oauthbearer_prod">Secure/Production Use of SASL/OAUTHBEARER</a></h5>
                    Production use cases will require writing an implementation of
                    <code>org.apache.kafka.common.security.auth.AuthenticateCallbackHandler</code> that can handle an instance of
                    <code>org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback</code> and declaring it via either the
                    <code>sasl.login.callback.handler.class</code> configuration option for a
                    non-broker client or via the
                    <code>listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class</code>
                    configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker
                    protocol).
                    <p>
                        Production use cases will also require writing an implementation of
                        <code>org.apache.kafka.common.security.auth.AuthenticateCallbackHandler</code> that can handle an instance of
                        <code>org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback</code> and declaring it via the
                        <code>listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class</code>
                        broker configuration option.
                </li>
                <li><h5><a id="security_sasl_oauthbearer_security" href="#security_sasl_oauthbearer_security">Security Considerations for SASL/OAUTHBEARER</a></h5>
                    <ul>
                        <li>The default implementation of SASL/OAUTHBEARER in Kafka creates and validates <a href="https://tools.ietf.org/html/rfc7515#appendix-A.5">Unsecured JSON Web Tokens</a>.
                            This is suitable only for non-production use.</li>
                        <li>OAUTHBEARER should be used in production enviromnments only with TLS-encryption to prevent interception of tokens.</li>
                        <li>The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments)
                            using custom login and SASL Server callback handlers as described above.</li>
                        <li>For more details on OAuth 2 security considerations in general, refer to <a href="https://tools.ietf.org/html/rfc6749#section-10">RFC 6749, Section 10</a>.</li>
                    </ul>
                </li>
            </ol>
        </li>

        <li><h4 class="anchor-heading"><a id="security_sasl_multimechanism" class="anchor-link"></a><a href="#security_sasl_multimechanism">Enabling multiple SASL mechanisms in a broker</a></h4>
            <ol>
                <li>Specify configuration for the login modules of all enabled mechanisms in the <code>KafkaServer</code> section of the JAAS config file. For example:
                    <pre><code class="language-text">KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};</code></pre></li>
                <li>Enable the SASL mechanisms in server.properties: <pre><code class="language-text">sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER</code></pre></li>
                <li>Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required:
                    <pre><code class="language-text">security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)</code></pre></li>
                <li>Follow the mechanism-specific steps in <a href="#security_sasl_kerberos_brokerconfig">GSSAPI (Kerberos)</a>,
                    <a href="#security_sasl_plain_brokerconfig">PLAIN</a>,
                    <a href="#security_sasl_scram_brokerconfig">SCRAM</a> and <a href="#security_sasl_oauthbearer_brokerconfig">OAUTHBEARER</a>
                    to configure SASL for the enabled mechanisms.</li>
            </ol>
        </li>
        <li><h4 class="anchor-heading"><a id="saslmechanism_rolling_upgrade" class="anchor-link"></a><a href="#saslmechanism_rolling_upgrade">Modifying SASL mechanism in a Running Cluster</a></h4>
            <p>SASL mechanism can be modified in a running cluster using the following sequence:</p>
            <ol>
                <li>Enable new SASL mechanism by adding the mechanism to <code>sasl.enabled.mechanisms</code> in server.properties for each broker. Update JAAS config file to include both
                    mechanisms as described <a href="#security_sasl_multimechanism">here</a>. Incrementally bounce the cluster nodes.</li>
                <li>Restart clients using the new mechanism.</li>
                <li>To change the mechanism of inter-broker communication (if this is required), set <code>sasl.mechanism.inter.broker.protocol</code> in server.properties to the new mechanism and
                    incrementally bounce the cluster again.</li>
                <li>To remove old mechanism (if this is required), remove the old mechanism from <code>sasl.enabled.mechanisms</code> in server.properties and remove the entries for the
                    old mechanism from JAAS config file. Incrementally bounce the cluster again.</li>
            </ol>
        </li>

        <li><h4 class="anchor-heading"><a id="security_delegation_token" class="anchor-link"></a><a href="#security_delegation_token">Authentication using Delegation Tokens</a></h4>
            <p>Delegation token based authentication is a lightweight authentication mechanism to complement existing SASL/SSL
                methods. Delegation tokens are shared secrets between kafka brokers and clients. Delegation tokens will help processing
                frameworks to distribute the workload to available workers in a secure environment without the added cost of distributing
                Kerberos TGT/keytabs or keystores when 2-way SSL is used. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka">KIP-48</a>
                for more details.</p>
            Under the default implementation of <code>principal.builder.class</code>, the owner of delegation token is used as the authenticated <code>Principal</code> for configuration of ACLs etc.

            <p>Typical steps for delegation token usage are:</p>
            <ol>
                <li>User authenticates with the Kafka cluster via SASL or SSL, and obtains a delegation token. This can be done
                    using Admin APIs or using <code>kafka-delegation-tokens.sh</code> script.</li>
                <li>User securely passes the delegation token to Kafka clients for authenticating with the Kafka cluster.</li>
                <li>Token owner/renewer can renew/expire the delegation tokens.</li>
            </ol>

            <ol>
                <li><h5 class="anchor-heading"><a id="security_token_management" class="anchor-link"></a><a href="#security_token_management">Token Management</a></h5>
                    <p> A secret is used to generate and verify delegation tokens. This is supplied using config
                        option <code>delegation.token.secret.key</code>. The same secret key must be configured across all the brokers.
                        If using Kafka with KRaft the controllers must also be configured with the secret using the same config option.
                        If the secret is not set or set to empty string, delegation token authentication and API operations will fail.</p>

                    <p>When using Kafka with Zookeeper, the token details are stored in Zookeeper and delegation tokens are suitable
                        for use in Kafka installations where Zookeeper is on a private network. When using Kafka with KRaft, the token
                        details are stored with the other metadata on the controller nodes and delegation tokens are suitable for use
                        when the controllers are on a private network or when all commnications between brokers and controllers is
                        encrypted.  Currently, this secret is stored as plain text in the server.properties config file.
                        We intend to make these configurable in a future Kafka release.</p>

                    <p>A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours
                        for up to 7 days. These can be configured using <code>delegation.token.expiry.time.ms</code>
                        and <code>delegation.token.max.lifetime.ms</code> config options.</p>

                    <p>Tokens can also be cancelled explicitly.  If a token is not renewed by the token’s expiration time or if token
                        is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.</p>
                </li>

                <li><h5 class="anchor-heading"><a id="security_sasl_create_tokens" class="anchor-link"></a><a href="#security_sasl_create_tokens">Creating Delegation Tokens</a></h5>
                    <p>Tokens can be created by using Admin APIs or using <code>kafka-delegation-tokens.sh</code> script.
                        Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels.
                        Tokens can not be requests if the initial authentication is done through delegation token.
                        A token can be created by the user for that user or others as well by specifying the <code>--owner-principal</code> parameter.
                        Owner/Renewers can renew or expire tokens. Owner/renewers can always describe their own tokens.
                        To describe other tokens, a DESCRIBE_TOKEN permission needs to be added on the User resource representing the owner of the token.
                        <code>kafka-delegation-tokens.sh</code> script examples are given below.</p>
                    <p>Create a delegation token:
                    <pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1</code></pre>
                    <p>Create a delegation token for a different owner:
                    <pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1</code></pre>
                    <p>Renew a delegation token:
                    <pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK</code></pre>
                    <p>Expire a delegation token:
                    <pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK</code></pre>
                    <p>Existing tokens can be described using the --describe option:
                    <pre><code class="language-bash">$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1</code></pre>
                </li>
                <li><h5 class="anchor-heading"><a id="security_token_authentication" class="anchor-link"></a><a href="#security_token_authentication">Token Authentication</a></h5>
                    <p>Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable
                        SASL/SCRAM mechanism on Kafka cluster as described in <a href="#security_sasl_scram">here</a>.</p>

                    <p>Configuring Kafka Clients:</p>
                    <ol>
                        <li>Configure the JAAS configuration property for each client in producer.properties or consumer.properties.
                            The login module describes how the clients like producer and consumer can connect to the Kafka Broker.
                            The following is an example configuration for a client for the token authentication:
                            <pre><code class="language-text">sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="tokenID123" \
    password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
    tokenauth="true";</code></pre>

                            <p>The options <code>username</code> and <code>password</code> are used by clients to configure the token id and
                                token HMAC. And the option <code>tokenauth</code> is used to indicate the server about token authentication.
                                In this example, clients connect to the broker using token id: <i>tokenID123</i>. Different clients within a
                                JVM may connect using different tokens by specifying different token details in <code>sasl.jaas.config</code>.</p>

                            <p>JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers
                                as described <a href="#security_client_staticjaas">here</a>. Clients use the login section named
                                <code>KafkaClient</code>. This option allows only one user for all client connections from a JVM.</p></li>
                    </ol>
                </li>

                <li><h5><a id="security_token_secret_rotation" href="#security_token_secret_rotation">Procedure to manually rotate the secret:</a></h5>
                    <p>We require a re-deployment when the secret needs to be rotated. During this process, already connected clients
                        will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.</p>

                    <ol>
                        <li>Expire all existing tokens.</li>
                        <li>Rotate the secret by rolling upgrade, and</li>
                        <li>Generate new tokens</li>
                    </ol>
                    <p>We intend to automate this in a future Kafka release.</p>
                </li>
            </ol>
        </li>
    </ol>

    <h3 class="anchor-heading"><a id="security_authz" class="anchor-link"></a><a href="#security_authz">7.5 Authorization and ACLs</a></h3>
    Kafka ships with a pluggable authorization framework, which is configured with the <code>authorizer.class.name</code> property in the server configuration.
    Configured implementations must extend <code>org.apache.kafka.server.authorizer.Authorizer</code>.
    Kafka provides default implementations which store ACLs in the cluster metadata (either Zookeeper or the KRaft metadata log).

    For Zookeeper-based clusters, the provided implementation is configured as follows:
    <pre><code class="language-text">authorizer.class.name=kafka.security.authorizer.AclAuthorizer</code></pre>
    For KRaft clusters, use the following configuration on all nodes (brokers, controllers, or combined broker/controller nodes):
    <pre><code class="language-text">authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer</code></pre>

    Kafka ACLs are defined in the general format of "Principal {P} is [Allowed|Denied] Operation {O} From Host {H} on any Resource {R} matching ResourcePattern {RP}".
    You can read more about the ACL structure in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface">KIP-11</a> and
    resource patterns in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>.
    In order to add, remove, or list ACLs, you can use the Kafka ACL CLI <code>kafka-acls.sh</code>. By default, if no ResourcePatterns match a specific Resource R,
    then R has no associated ACLs, and therefore no one other than super users is allowed to access R.
    If you want to change that behavior, you can include the following in server.properties.
    <pre><code class="language-text">allow.everyone.if.no.acl.found=true</code></pre>
    One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
    <pre><code class="language-text">super.users=User:Bob;User:Alice</code></pre>

    <h5 class="anchor-heading"><a id="kraft_principal_forwarding" class="anchor-link"></a><a href="#kraft_principal_forwarding">KRaft Principal Forwarding</a></h5>

    In KRaft clusters, admin requests such as <code>CreateTopics</code> and <code>DeleteTopics</code> are sent to the broker listeners by the client. The broker then forwards the request to the active controller through the first listener configured in <code>controller.listener.names</code>.
    Authorization of these requests is done on the controller node. This is achieved by way of an <code>Envelope</code> request which packages both the underlying request from the client as well as the client principal.
    When the controller receives the forwarded <code>Envelope</code> request from the broker, it first authorizes the <code>Envelope</code> request using the authenticated broker principal.
    Then it authorizes the underlying request using the forwarded principal.

    <br>All of this implies that Kafka must understand how to serialize and deserialize the client principal. The authentication framework allows for customized principals by overriding the <code>principal.builder.class</code> configuration.
    In order for customized principals to work with KRaft, the configured class must implement <code>org.apache.kafka.common.security.auth.KafkaPrincipalSerde</code> so that Kafka knows how to serialize and deserialize the principals.
    The default implementation <code>org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder</code> uses the Kafka RPC format defined in the source code: <code>clients/src/main/resources/common/message/DefaultPrincipalData.json</code>.

    For more detail about request forwarding in KRaft, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller">KIP-590</a>
    
    <h5 class="anchor-heading"><a id="security_authz_ssl" class="anchor-link"></a><a href="#security_authz_ssl">Customizing SSL User Name</a></h5>

    By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting <code>ssl.principal.mapping.rules</code> to a customized rule in server.properties.
    This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.

    <br>The format of <code>ssl.principal.mapping.rules</code> is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return
    string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name.
    This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.

    <pre><code class="language-text">RULE:pattern/replacement/
RULE:pattern/replacement/[LU]</code></pre>

    Example <code>ssl.principal.mapping.rules</code> values are:
    <pre><code class="language-text">RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT</code></pre>

    Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser"
    and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".

    <br>For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
    <pre><code class="language-text">principal.builder.class=CustomizedPrincipalBuilderClass</code></pre>

    <h5 class="anchor-heading"><a id="security_authz_sasl" class="anchor-link"></a><a href="#security_authz_sasl">Customizing SASL User Name</a></h5>

    By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting <code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in server.properties.
    The format of <code>sasl.kerberos.principal.to.local.rules</code> is a list where each rule works in the same way as the auth_to_local in <a href="https://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html">Kerberos configuration file (krb5.conf)</a>. This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax.
    Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
    <pre><code class="language-text">RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L
RULE:[n:string](regexp)s/pattern/replacement//U
RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>

    An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
    <pre><code class="language-text">sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT</code></pre>

    <h4 class="anchor-heading"><a id="security_authz_cli" class="anchor-link"></a><a href="#security_authz_cli">Command Line Interface</a></h4>
    Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called <b>kafka-acls.sh</b>. Following lists all the options that the script supports:
    <p></p>
    <table class="data-table">
        <tr>
            <th>Option</th>
            <th>Description</th>
            <th>Default</th>
            <th>Option type</th>
        </tr>
        <tr>
            <td>--add</td>
            <td>Indicates to the script that user is trying to add an acl.</td>
            <td></td>
            <td>Action</td>
        </tr>
        <tr>
            <td>--remove</td>
            <td>Indicates to the script that user is trying to remove an acl.</td>
            <td></td>
            <td>Action</td>
        </tr>
        <tr>
            <td>--list</td>
            <td>Indicates to the script that user is trying to list acls.</td>
            <td></td>
            <td>Action</td>
        </tr>
        <tr>
            <td>--bootstrap-server</td>
            <td>A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified.</td>
            <td></td>
            <td>Configuration</td>
        </tr>
        <tr>
            <td>--command-config</td>
            <td>A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option.</td>
            <td></td>
            <td>Configuration</td>
        </tr>
        <tr>
            <td>--cluster</td>
            <td>Indicates to the script that the user is trying to interact with acls on the singular cluster resource.</td>
            <td></td>
            <td>ResourcePattern</td>
        </tr>
        <tr>
            <td>--topic [topic-name]</td>
            <td>Indicates to the script that the user is trying to interact with acls on topic resource pattern(s).</td>
            <td></td>
            <td>ResourcePattern</td>
        </tr>
        <tr>
            <td>--group [group-name]</td>
            <td>Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s)</td>
            <td></td>
            <td>ResourcePattern</td>
        </tr>
        <tr>
            <td>--transactional-id [transactional-id]</td>
            <td>The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds.</td>
            <td></td>
            <td>ResourcePattern</td>
        </tr>
        <tr>
            <td>--delegation-token [delegation-token]</td>
            <td>Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens.</td>
            <td></td>
            <td>ResourcePattern</td>
        </tr>
        <tr>
            <td>--user-principal [user-principal]</td>
            <td>A user resource to which ACLs should be added or removed. This is currently supported in relation with delegation tokens.
                A value of * indicates ACL should apply to all users.</td>
            <td></td>
            <td>ResourcePattern</td>
        </tr>
        <tr>
            <td>--resource-pattern-type [pattern-type]</td>
            <td>Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use.<br>
                When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'.<br>
                When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern,
                or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly,
                and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s).<br>
                WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.
            </td>
            <td>literal</td>
            <td>Configuration</td>
        </tr>
        <tr>
            <td>--allow-principal</td>
            <td>Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive. <br>You can specify multiple --allow-principal in a single command.</td>
            <td></td>
            <td>Principal</td>
        </tr>
        <tr>
            <td>--deny-principal</td>
            <td>Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive. <br>You can specify multiple --deny-principal in a single command.</td>
            <td></td>
            <td>Principal</td>
        </tr>
        <tr>
            <td>--principal</td>
            <td>Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal. <br>You can specify multiple --principal in a single command.</td>
            <td></td>
            <td>Principal</td>
        </tr>
        <tr>
            <td>--allow-host</td>
            <td>IP address from which principals listed in --allow-principal will have access.</td>
            <td> if --allow-principal is specified defaults to * which translates to "all hosts"</td>
            <td>Host</td>
        </tr>
        <tr>
            <td>--deny-host</td>
            <td>IP address from which principals listed in --deny-principal will be denied access.</td>
            <td>if --deny-principal is specified defaults to * which translates to "all hosts"</td>
            <td>Host</td>
        </tr>
        <tr>
            <td>--operation</td>
            <td>Operation that will be allowed or denied.<br>
                Valid values are:
                <ul>
                    <li>Read</li>
                    <li>Write</li>
                    <li>Create</li>
                    <li>Delete</li>
                    <li>Alter</li>
                    <li>Describe</li>
                    <li>ClusterAction</li>
                    <li>DescribeConfigs</li>
                    <li>AlterConfigs</li>
                    <li>IdempotentWrite</li>
                    <li>CreateTokens</li>
                    <li>DescribeTokens</li>
                    <li>All</li>
                </ul>
            </td>
            <td>All</td>
            <td>Operation</td>
        </tr>
        <tr>
            <td>--producer</td>
            <td> Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE,
                DESCRIBE and CREATE on topic.</td>
            <td></td>
            <td>Convenience</td>
        </tr>
        <tr>
            <td>--consumer</td>
            <td> Convenience option to add/remove acls for consumer role. This will generate acls that allows READ,
                DESCRIBE on topic and READ on consumer-group.</td>
            <td></td>
            <td>Convenience</td>
        </tr>
        <tr>
            <td>--idempotent</td>
            <td>Enable idempotence for the producer. This should be used in combination with the --producer option.<br>
                Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id.
            </td>
            <td></td>
            <td>Convenience</td>
        </tr>
        <tr>
            <td>--force</td>
            <td> Convenience option to assume yes to all queries and do not prompt.</td>
            <td></td>
            <td>Convenience</td>
        </tr>
        <tr>
            <td>--authorizer</td>
            <td>(DEPRECATED: not supported in KRaft) Fully qualified class name of the authorizer.</td>
            <td>kafka.security.authorizer.AclAuthorizer</td>
            <td>Configuration</td>
        </tr>
        <tr>
            <td>--authorizer-properties</td>
            <td>(DEPRECATED: not supported in KRaft) key=val pairs that will be passed to authorizer for initialization. For the default authorizer in ZK clsuters, the example values are: zookeeper.connect=localhost:2181</td>
            <td></td>
            <td>Configuration</td>
        </tr>
        <tr>
            <td>--zk-tls-config-file</td>
            <td>(DEPRECATED: not supported in KRaft) Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined.
                Any properties other than the following (with or without an "authorizer." prefix) are ignored:
                zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable,
                zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm,
                zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type,
                zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location,
                zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type
            </td>
            <td></td>
            <td>Configuration</td>
        </tr>
    </table>

    <h4 class="anchor-heading"><a id="security_authz_examples" class="anchor-link"></a><a href="#security_authz_examples">Examples</a></h4>
    <ul>
        <li><b>Adding Acls</b><br>
            Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1". You can do that by executing the CLI with following options:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic</code></pre>
            By default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic</code></pre>
            Note that <code>--allow-host</code> and <code>--deny-host</code> only support IP addresses (hostnames are not supported).
            Above examples add acls to a topic by specifying --topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
            You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0"
            You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic '*'</code></pre>
            You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name starts with 'Test-' from any host".
            You can do that by executing the CLI with following options:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixed</code></pre>
            Note, --resource-pattern-type defaults to 'literal', which only affects resources with the exact same name or, in the case of the wildcard resource name '*', a resource with any name.</li>

        <li><b>Removing Acls</b><br>
            Removing acls is pretty much the same. The only difference is instead of --add option users will have to specify --remove option. To remove the acls added by the first example above we can execute the CLI with following options:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic </code></pre>
            If you want to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed</code></pre></li>

        <li><b>List Acls</b><br>
            We can list acls for any resource by specifying the --list option with the resource. To list all acls on the literal resource pattern Test-topic, we can execute the CLI with following options:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic</code></pre>
            However, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic,
            e.g. any acls on the topic wildcard '*', or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic '*'</code></pre>
            However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known.
            We can list <i>all</i> acls affecting Test-topic by using '--resource-pattern-type match', e.g.
            <pre><code class="language-bash">&gt; bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic --resource-pattern-type match</code></pre>
            This will list acls on all matching literal, wildcard and prefixed resource patterns.</li>

        <li><b>Adding or removing a principal as producer or consumer</b><br>
            The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of  Test-topic we can execute the following command:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --producer --topic Test-topic</code></pre>
            Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </code></pre>
            Note that for consumer option we must also specify the consumer group.
            In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>

        <li><b>Admin API based acl management</b><br>
            Users having Alter permission on ClusterResource can use Admin API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly.
            All the above examples can be executed by using <b>--bootstrap-server</b> option. For example:

            <pre><code class="language-bash">$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:tokenRequester --operation CreateTokens --user-principal "owner1"</code></pre></li>

    </ul>

    <h4 class="anchor-heading"><a id="security_authz_primitives" class="anchor-link"></a><a href="#security_authz_primitives">Authorization Primitives</a></h4>
    <p>Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the
        operations and resources to set up effective protection. In this section we'll list these operations and
        resources, then list the combination of these with the protocols to see the valid scenarios.</p>
    <h5 class="anchor-heading"><a id="operations_in_kafka" class="anchor-link"></a><a href="#operations_in_kafka">Operations in Kafka</a></h5>
    <p>There are a few operation primitives that can be used to build up privileges. These can be matched up with
        certain resources to allow specific protocol calls for a given user. These are:</p>
    <ul>
        <li>Read</li>
        <li>Write</li>
        <li>Create</li>
        <li>Delete</li>
        <li>Alter</li>
        <li>Describe</li>
        <li>ClusterAction</li>
        <li>DescribeConfigs</li>
        <li>AlterConfigs</li>
        <li>IdempotentWrite</li>
        <li>CreateTokens</li>
        <li>DescribeTokens</li>
        <li>All</li>
    </ul>
    <h5 class="anchor-heading"><a id="resources_in_kafka" class="anchor-link"></a><a href="#resources_in_kafka">Resources in Kafka</a></h5>
    <p>The operations above can be applied on certain resources which are described below.</p>
    <ul>
        <li><b>Topic:</b> this simply represents a Topic. All protocol calls that are acting on topics (such as reading,
            writing them) require the corresponding privilege to be added. If there is an authorization error with a
            topic resource, then a TOPIC_AUTHORIZATION_FAILED (error code: 29) will be returned.</li>
        <li><b>Group:</b> this represents the consumer groups in the brokers. All protocol calls that are working with
            consumer groups, like joining a group must have privileges with the group in subject. If the privilege is not
            given then a GROUP_AUTHORIZATION_FAILED (error code: 30) will be returned in the protocol response.</li>
        <li><b>Cluster:</b> this resource represents the cluster. Operations that are affecting the whole cluster, like
            controlled shutdown are protected by privileges on the Cluster resource. If there is an authorization problem
            on a cluster resource, then a CLUSTER_AUTHORIZATION_FAILED (error code: 31) will be returned.</li>
        <li><b>TransactionalId:</b> this resource represents actions related to transactions, such as committing.
            If any error occurs, then a TRANSACTIONAL_ID_AUTHORIZATION_FAILED (error code: 53) will be returned by brokers.</li>
        <li><b>DelegationToken:</b> this represents the delegation tokens in the cluster. Actions, such as describing
            delegation tokens could be protected by a privilege on the DelegationToken resource. Since these objects have
            a little special behavior in Kafka it is recommended to read
            <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka#KIP-48DelegationtokensupportforKafka-DescribeDelegationTokenRequest">KIP-48</a>
            and the related upstream documentation at <a href="#security_delegation_token">Authentication using Delegation Tokens</a>.</li>
        <li><b>User:</b> CreateToken and DescribeToken operations can be granted to User resources to allow creating and describing
            tokens for other users. More info can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-373%3A+Allow+users+to+create+delegation+tokens+for+other+users">KIP-373</a>.</li>
    </ul>
    <h5 class="anchor-heading"><a id="operations_resources_and_protocols" class="anchor-link"></a><a href="#operations_resources_and_protocols">Operations and Resources on Protocols</a></h5>
    <p>In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.</p>
    <table class="data-table">
        <thead>
        <tr>
            <th>Protocol (API key)</th>
            <th>Operation</th>
            <th>Resource</th>
            <th>Note</th>
        </tr>
        </thead>
        <tbody>
        <tr>
            <td>PRODUCE (0)</td>
            <td>Write</td>
            <td>TransactionalId</td>
            <td>An transactional producer which has its transactional.id set requires this privilege.</td>
        </tr>
        <tr>
            <td>PRODUCE (0)</td>
            <td>IdempotentWrite</td>
            <td>Cluster</td>
            <td>An idempotent produce action requires this privilege.</td>
        </tr>
        <tr>
            <td>PRODUCE (0)</td>
            <td>Write</td>
            <td>Topic</td>
            <td>This applies to a normal produce action.</td>
        </tr>
        <tr>
            <td>FETCH (1)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td>A follower must have ClusterAction on the Cluster resource in order to fetch partition data.</td>
        </tr>
        <tr>
            <td>FETCH (1)</td>
            <td>Read</td>
            <td>Topic</td>
            <td>Regular Kafka consumers need READ permission on each partition they are fetching.</td>
        </tr>
        <tr>
            <td>LIST_OFFSETS (2)</td>
            <td>Describe</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>METADATA (3)</td>
            <td>Describe</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>METADATA (3)</td>
            <td>Create</td>
            <td>Cluster</td>
            <td>If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster
                level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the
                Topic level privileges (see the next one).</td>
        </tr>
        <tr>
            <td>METADATA (3)</td>
            <td>Create</td>
            <td>Topic</td>
            <td>This authorizes auto topic creation if enabled but the given user doesn't have a cluster level
                permission (above).</td>
        </tr>
        <tr>
            <td>LEADER_AND_ISR (4)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>STOP_REPLICA (5)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>UPDATE_METADATA (6)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>CONTROLLED_SHUTDOWN (7)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>OFFSET_COMMIT (8)</td>
            <td>Read</td>
            <td>Group</td>
            <td>An offset can only be committed if it's authorized to the given group and the topic too (see below).
                Group access is checked first, then Topic access.</td>
        </tr>
        <tr>
            <td>OFFSET_COMMIT (8)</td>
            <td>Read</td>
            <td>Topic</td>
            <td>Since offset commit is part of the consuming process, it needs privileges for the read action.</td>
        </tr>
        <tr>
            <td>OFFSET_FETCH (9)</td>
            <td>Describe</td>
            <td>Group</td>
            <td>Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able
                to fetch. However in this case it requires describe access instead of read. Group access is checked first,
                then Topic access.</td>
        </tr>
        <tr>
            <td>OFFSET_FETCH (9)</td>
            <td>Describe</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>FIND_COORDINATOR (10)</td>
            <td>Describe</td>
            <td>Group</td>
            <td>The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup
                coordinators. This privilege would represent the Group mode.</td>
        </tr>
        <tr>
            <td>FIND_COORDINATOR (10)</td>
            <td>Describe</td>
            <td>TransactionalId</td>
            <td>This applies only on transactional producers and checked when a producer tries to find the transaction
                coordinator.</td>
        </tr>
        <tr>
            <td>JOIN_GROUP (11)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>HEARTBEAT (12)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>LEAVE_GROUP (13)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>SYNC_GROUP (14)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_GROUPS (15)</td>
            <td>Describe</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>LIST_GROUPS (16)</td>
            <td>Describe</td>
            <td>Cluster</td>
            <td>When the broker checks to authorize a list_groups request it first checks for this cluster
                level authorization. If none found then it proceeds to check the groups individually. This operation
                doesn't return CLUSTER_AUTHORIZATION_FAILED.</td>
        </tr>
        <tr>
            <td>LIST_GROUPS (16)</td>
            <td>Describe</td>
            <td>Group</td>
            <td>If none of the groups are authorized, then just an empty response will be sent back instead
                of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the
                2.1 release.</td>
        </tr>
        <tr>
            <td>SASL_HANDSHAKE (17)</td>
            <td></td>
            <td></td>
            <td>The SASL handshake is part of the authentication process and therefore it's not possible to
                apply any kind of authorization here.</td>
        </tr>
        <tr>
            <td>API_VERSIONS (18)</td>
            <td></td>
            <td></td>
            <td>The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection
                and before any authentication. Therefore it's not possible to control this with authorization.</td>
        </tr>
        <tr>
            <td>CREATE_TOPICS (19)</td>
            <td>Create</td>
            <td>Cluster</td>
            <td>If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but
                fall back to use topic level, which is just below. That'll throw error if there is a problem.</td>
        </tr>
        <tr>
            <td>CREATE_TOPICS (19)</td>
            <td>Create</td>
            <td>Topic</td>
            <td>This is applicable from the 2.0 release.</td>
        </tr>
        <tr>
            <td>DELETE_TOPICS (20)</td>
            <td>Delete</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>DELETE_RECORDS (21)</td>
            <td>Delete</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>INIT_PRODUCER_ID (22)</td>
            <td>Write</td>
            <td>TransactionalId</td>
            <td></td>
        </tr>
        <tr>
            <td>INIT_PRODUCER_ID (22)</td>
            <td>IdempotentWrite</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>OFFSET_FOR_LEADER_EPOCH (23)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td>If there is no cluster level privilege for this operation, then it'll check for topic level one.</td>
        </tr>
        <tr>
            <td>OFFSET_FOR_LEADER_EPOCH (23)</td>
            <td>Describe</td>
            <td>Topic</td>
            <td>This is applicable from the 2.1 release.</td>
        </tr>
        <tr>
            <td>ADD_PARTITIONS_TO_TXN (24)</td>
            <td>Write</td>
            <td>TransactionalId</td>
            <td>This API is only applicable to transactional requests. It first checks for the Write action on the
                TransactionalId resource, then it checks the Topic in subject (below).</td>
        </tr>
        <tr>
            <td>ADD_PARTITIONS_TO_TXN (24)</td>
            <td>Write</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>ADD_OFFSETS_TO_TXN (25)</td>
            <td>Write</td>
            <td>TransactionalId</td>
            <td>Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks
                for Write action on the TransactionalId resource, then it checks whether it can Read on the given group
                (below).</td>
        </tr>
        <tr>
            <td>ADD_OFFSETS_TO_TXN (25)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>END_TXN (26)</td>
            <td>Write</td>
            <td>TransactionalId</td>
            <td></td>
        </tr>
        <tr>
            <td>WRITE_TXN_MARKERS (27)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>WRITE_TXN_MARKERS (27)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>TXN_OFFSET_COMMIT (28)</td>
            <td>Write</td>
            <td>TransactionalId</td>
            <td></td>
        </tr>
        <tr>
            <td>TXN_OFFSET_COMMIT (28)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>TXN_OFFSET_COMMIT (28)</td>
            <td>Read</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_ACLS (29)</td>
            <td>Describe</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>CREATE_ACLS (30)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DELETE_ACLS (31)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_CONFIGS (32)</td>
            <td>DescribeConfigs</td>
            <td>Cluster</td>
            <td>If broker configs are requested, then the broker will check cluster level privileges.</td>
        </tr>
        <tr>
            <td>DESCRIBE_CONFIGS (32)</td>
            <td>DescribeConfigs</td>
            <td>Topic</td>
            <td>If topic configs are requested, then the broker will check topic level privileges.</td>
        </tr>
        <tr>
            <td>ALTER_CONFIGS (33)</td>
            <td>AlterConfigs</td>
            <td>Cluster</td>
            <td>If broker configs are altered, then the broker will check cluster level privileges.</td>
        </tr>
        <tr>
            <td>ALTER_CONFIGS (33)</td>
            <td>AlterConfigs</td>
            <td>Topic</td>
            <td>If topic configs are altered, then the broker will check topic level privileges.</td>
        </tr>
        <tr>
            <td>ALTER_REPLICA_LOG_DIRS (34)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_LOG_DIRS (35)</td>
            <td>Describe</td>
            <td>Cluster</td>
            <td>An empty response will be returned on authorization failure.</td>
        </tr>
        <tr>
            <td>SASL_AUTHENTICATE (36)</td>
            <td></td>
            <td></td>
            <td>SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to
                apply any kind of authorization here.</td>
        </tr>
        <tr>
            <td>CREATE_PARTITIONS (37)</td>
            <td>Alter</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>CREATE_DELEGATION_TOKEN (38)</td>
            <td></td>
            <td></td>
            <td>Creating delegation tokens has special rules, for this please see the
                <a id="security_delegation_token_1" href="#security_delegation_token">Authentication using Delegation Tokens</a> section.</td>
        </tr>
        <tr>
            <td>CREATE_DELEGATION_TOKEN (38)</td>
            <td>CreateTokens</td>
            <td>User</td>
            <td>Allows creating delegation tokens for the User resource.</td>
        </tr>
        <tr>
            <td>RENEW_DELEGATION_TOKEN (39)</td>
            <td></td>
            <td></td>
            <td>Renewing delegation tokens has special rules, for this please see the
                <a id="security_delegation_token_2" href="#security_delegation_token">Authentication using Delegation Tokens</a> section.</td>
        </tr>
        <tr>
            <td>EXPIRE_DELEGATION_TOKEN (40)</td>
            <td></td>
            <td></td>
            <td>Expiring delegation tokens has special rules, for this please see the
                <a id="security_delegation_token_3" href="#security_delegation_token">Authentication using Delegation Tokens</a> section.</td>
        </tr>
        <tr>
            <td>DESCRIBE_DELEGATION_TOKEN (41)</td>
            <td>Describe</td>
            <td>DelegationToken</td>
            <td>Describing delegation tokens has special rules, for this please see the
                <a id="security_delegation_token_4" href="#security_delegation_token">Authentication using Delegation Tokens</a> section.</td>
        </tr>
        <tr>
            <td>DESCRIBE_DELEGATION_TOKEN (41)</td>
            <td>DescribeTokens</td>
            <td>User</td>
            <td>Allows describing delegation tokens of the User resource.</td>
        </tr>
        <tr>
            <td>DELETE_GROUPS (42)</td>
            <td>Delete</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>ELECT_PREFERRED_LEADERS (43)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>INCREMENTAL_ALTER_CONFIGS (44)</td>
            <td>AlterConfigs</td>
            <td>Cluster</td>
            <td>If broker configs are altered, then the broker will check cluster level privileges.</td>
        </tr>
        <tr>
            <td>INCREMENTAL_ALTER_CONFIGS (44)</td>
            <td>AlterConfigs</td>
            <td>Topic</td>
            <td>If topic configs are altered, then the broker will check topic level privileges.</td>
        </tr>
        <tr>
            <td>ALTER_PARTITION_REASSIGNMENTS (45)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>LIST_PARTITION_REASSIGNMENTS (46)</td>
            <td>Describe</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>OFFSET_DELETE (47)</td>
            <td>Delete</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>OFFSET_DELETE (47)</td>
            <td>Read</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_CLIENT_QUOTAS (48)</td>
            <td>DescribeConfigs</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>ALTER_CLIENT_QUOTAS (49)</td>
            <td>AlterConfigs</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_USER_SCRAM_CREDENTIALS (50)</td>
            <td>Describe</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>ALTER_USER_SCRAM_CREDENTIALS (51)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>VOTE (52)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>BEGIN_QUORUM_EPOCH (53)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>END_QUORUM_EPOCH (54)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_QUORUM (55)</td>
            <td>Describe</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>ALTER_PARTITION (56)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>UPDATE_FEATURES (57)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>ENVELOPE (58)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>FETCH_SNAPSHOT (59)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_CLUSTER (60)</td>
            <td>Describe</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_PRODUCERS (61)</td>
            <td>Read</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>BROKER_REGISTRATION (62)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>BROKER_HEARTBEAT (63)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>UNREGISTER_BROKER (64)</td>
            <td>Alter</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_TRANSACTIONS (65)</td>
            <td>Describe</td>
            <td>TransactionalId</td>
            <td></td>
        </tr>
        <tr>
            <td>LIST_TRANSACTIONS (66)</td>
            <td>Describe</td>
            <td>TransactionalId</td>
            <td></td>
        </tr>
        <tr>
            <td>ALLOCATE_PRODUCER_IDS (67)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>CONSUMER_GROUP_HEARTBEAT (68)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>CONSUMER_GROUP_DESCRIBE (69)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>CONTROLLER_REGISTRATION (70)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>GET_TELEMETRY_SUBSCRIPTIONS (71)</td>
            <td></td>
            <td></td>
            <td>No authorization check is performed for this request.</td>
        </tr>
        <tr>
            <td>PUSH_TELEMETRY (72)</td>
            <td></td>
            <td></td>
            <td>No authorization check is performed for this request.</td>
        </tr>
        <tr>
            <td>ASSIGN_REPLICAS_TO_DIRS (73)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>LIST_CLIENT_METRICS_RESOURCES (74)</td>
            <td>DescribeConfigs</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DESCRIBE_TOPIC_PARTITIONS (75)</td>
            <td>Describe</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>SHARE_GROUP_HEARTBEAT (76)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>SHARE_GROUP_DESCRIBE (77)</td>
            <td>Describe</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>SHARE_FETCH (78)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>SHARE_FETCH (78)</td>
            <td>Read</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>SHARE_ACKNOWLEDGE (79)</td>
            <td>Read</td>
            <td>Group</td>
            <td></td>
        </tr>
        <tr>
            <td>SHARE_ACKNOWLEDGE (79)</td>
            <td>Read</td>
            <td>Topic</td>
            <td></td>
        </tr>
        <tr>
            <td>INITIALIZE_SHARE_GROUP_STATE (83)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>READ_SHARE_GROUP_STATE (84)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>WRITE_SHARE_GROUP_STATE (85)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>DELETE_SHARE_GROUP_STATE (86)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        <tr>
            <td>READ_SHARE_GROUP_STATE_SUMMARY (87)</td>
            <td>ClusterAction</td>
            <td>Cluster</td>
            <td></td>
        </tr>
        </tbody>
    </table>

    <h3 class="anchor-heading"><a id="security_rolling_upgrade" class="anchor-link"></a><a href="#security_rolling_upgrade">7.6 Incorporating Security Features in a Running Cluster</a></h3>
    You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:
    <p></p>
    <ul>
        <li>Incrementally bounce the cluster nodes to open additional secured port(s).</li>
        <li>Restart clients using the secured rather than PLAINTEXT port (assuming you are securing the client-broker connection).</li>
        <li>Incrementally bounce the cluster again to enable broker-to-broker security (if this is required)</li>
        <li>A final incremental bounce to close the PLAINTEXT port.</li>
    </ul>
    <p></p>
    The specific steps for configuring SSL and SASL are described in sections <a href="#security_ssl">7.3</a> and <a href="#security_sasl">7.4</a>.
    Follow these steps to enable security for your desired protocol(s).
    <p></p>
    The security implementation lets you configure different protocols for both broker-client and broker-broker communication.
    These must be enabled in separate bounces. A PLAINTEXT port must be left open throughout so brokers and/or clients can continue to communicate.
    <p></p>

    When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It's also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.
    <p></p>
    As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, an SSL port is opened on each node:
    <pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092</code></pre>

    We then restart the clients, changing their config to point at the newly opened, secured port:

    <pre><code class="language-text">bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc</code></pre>

    In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):

    <pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL</code></pre>

    In the final bounce we secure the cluster by closing the PLAINTEXT port:

    <pre><code class="language-text">listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL</code></pre>

    Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:

    <pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093</code></pre>

    We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:

    <pre><code class="language-text">bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc</code></pre>

    The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:

    <pre><code class="language-text">listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL</code></pre>

    The final bounce secures the cluster by closing the PLAINTEXT port.

    <pre><code class="language-text">listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL</code></pre>

    ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section <a href="#zk_authz_migration">7.7.2</a>.


    <h3 class="anchor-heading"><a id="zk_authz" class="anchor-link"></a><a href="#zk_authz">7.7 ZooKeeper Authentication</a></h3>
    ZooKeeper supports mutual TLS (mTLS) authentication beginning with the 3.5.x versions.
    Kafka supports authenticating to ZooKeeper with SASL and mTLS -- either individually or both together --
    beginning with version 2.5. See
    <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication">KIP-515: Enable ZK client to use the new TLS supported authentication</a>
    for more details.
    <p>When using mTLS alone, every broker and any CLI tools (such as the <a href="#zk_authz_migration">ZooKeeper Security Migration Tool</a>)
        should identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed.
        This can be changed as described below, but it involves writing and deploying a custom ZooKeeper authentication provider.
        Generally each certificate should have the same DN but a different Subject Alternative Name (SAN)
        so that hostname verification of the brokers and any CLI tools by ZooKeeper will succeed.
    </p>
    <p>
        When using SASL authentication to ZooKeeper together with mTLS, both the SASL identity and
        either the DN that created the znode (i.e. the creating broker's certificate)
        or the DN of the Security Migration Tool (if migration was performed after the znode was created)
        will be ACL'ed, and all brokers and CLI tools will be authorized even if they all use different DNs
        because they will all use the same ACL'ed SASL identity.
        It is only when  using mTLS authentication alone that all the DNs must match (and SANs become critical --
        again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).
    </p>
    <p>
        Use the broker properties file to set TLS configs for brokers as described below.
    </p>
    <p>
        Use the <code>--zk-tls-config-file &lt;file&gt;</code> option to set TLS configs in the Zookeeper Security Migration Tool.
        The <code>kafka-acls.sh</code> and <code>kafka-configs.sh</code> CLI tools also support the <code>--zk-tls-config-file &lt;file&gt;</code> option.
    </p>
    <p>
        Use the <code>-zk-tls-config-file &lt;file&gt;</code> option (note the single-dash rather than double-dash)
        to set TLS configs for the <code>zookeeper-shell.sh</code> CLI tool.
    </p>
    <h4 class="anchor-heading"><a id="zk_authz_new" class="anchor-link"></a><a href="#zk_authz_new">7.7.1 New clusters</a></h4>
    <h5 class="anchor-heading"><a id="zk_authz_new_sasl" class="anchor-link"></a><a href="#zk_authz_new_sasl">7.7.1.1 ZooKeeper SASL Authentication</a></h5>
    To enable ZooKeeper SASL authentication on brokers, there are two necessary steps:
    <ol>
        <li> Create a JAAS login file and set the appropriate system property to point to it as described above</li>
        <li> Set the configuration property <code>zookeeper.set.acl</code> in each broker to true</li>
    </ol>

    The metadata stored in ZooKeeper for the Kafka cluster is world-readable, but can only be modified by the brokers. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of that data can cause cluster disruption. We also recommend limiting the access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper).

    <h5 class="anchor-heading"><a id="zk_authz_new_mtls" class="anchor-link"></a><a href="#zk_authz_new_mtls">7.7.1.2 ZooKeeper Mutual TLS Authentication</a></h5>
    ZooKeeper mTLS authentication can be enabled with or without SASL authentication.  As mentioned above,
    when using mTLS alone, every broker and any CLI tools (such as the <a href="#zk_authz_migration">ZooKeeper Security Migration Tool</a>)
    must generally identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed, which means
    each certificate should have an appropriate Subject Alternative Name (SAN) so that
    hostname verification of the brokers and any CLI tool by ZooKeeper will succeed.
    <p>
        It is possible to use something other than the DN for the identity of mTLS clients by writing a class that
        extends <code>org.apache.zookeeper.server.auth.X509AuthenticationProvider</code> and overrides the method
        <code>protected String getClientId(X509Certificate clientCert)</code>.
        Choose a scheme name and set <code>authProvider.[scheme]</code> in ZooKeeper to be the fully-qualified class name
        of the custom implementation; then set <code>ssl.authProvider=[scheme]</code> to use it.
    </p>
    Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication.
    These configurations are described in the
    <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
    <pre><code class="language-text">secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd</code></pre>
    <strong>IMPORTANT</strong>: ZooKeeper does not support setting the key password in the ZooKeeper server keystore
    to a value different from the keystore password itself.
    Be sure to set the key password to be the same as the keystore password.

    <p>Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
        These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
    </p>
    <pre><code class="language-text"># connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
zookeeper.ssl.keystore.password=kafka-ks-passwd
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes
zookeeper.set.acl=true</code></pre>
    <strong>IMPORTANT</strong>: ZooKeeper does not support setting the key password in the ZooKeeper client (i.e. broker) keystore
    to a value different from the keystore password itself.
    Be sure to set the key password to be the same as the keystore password.

    <h4 class="anchor-heading"><a id="zk_authz_migration" class="anchor-link"></a><a href="#zk_authz_migration">7.7.2 Migrating clusters</a></h4>
    If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
    <ol>
        <li>Enable SASL and/or mTLS authentication on ZooKeeper.  If enabling mTLS, you would now have both a non-TLS port and a TLS port, like this:
            <pre><code class="language-text">clientPort=2181
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd</code></pre>
        </li>
        <li>Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations (including connecting to the TLS-enabled ZooKeeper port) as required, which enables brokers to authenticate to ZooKeeper. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs</li>
        <li>If you enabled mTLS, disable the non-TLS port in ZooKeeper</li>
        <li>Perform a second rolling restart of brokers, this time setting the configuration parameter <code>zookeeper.set.acl</code> to true, which enables the use of secure ACLs when creating znodes</li>
        <li>Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: <code>bin/zookeeper-security-migration.sh</code> with <code>zookeeper.acl</code> set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the <code>--zk-tls-config-file &lt;file&gt;</code> option if you enable mTLS.</li>
    </ol>
    <p>It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:</p>
    <ol>
        <li>Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations, which enables brokers to authenticate, but setting <code>zookeeper.set.acl</code> to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes</li>
        <li>Execute the ZkSecurityMigrator tool. To execute the tool, run this script <code>bin/zookeeper-security-migration.sh</code> with <code>zookeeper.acl</code> set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the <code>--zk-tls-config-file &lt;file&gt;</code> option if you need to set TLS configuration.</li>
        <li>If you are disabling mTLS, enable the non-TLS port in ZooKeeper</li>
        <li>Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file and/or removing ZooKeeper mutual TLS configuration (including connecting to the non-TLS-enabled ZooKeeper port) as required</li>
        <li>If you are disabling mTLS, disable the TLS port in ZooKeeper</li>
    </ol>
    Here is an example of how to run the migration tool:
    <pre><code class="language-bash">$ bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181</code></pre>
    <p>Run this to see the full list of parameters:</p>
    <pre><code class="language-bash">$ bin/zookeeper-security-migration.sh --help</code></pre>
    <h4 class="anchor-heading"><a id="zk_authz_ensemble" class="anchor-link"></a><a href="#zk_authz_ensemble">7.7.3 Migrating the ZooKeeper ensemble</a></h4>
    It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information.  Please refer to the ZooKeeper documentation for more detail:
    <ol>
        <li><a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
        <li><a href="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL">Apache ZooKeeper wiki</a></li>
    </ol>
    <h4 class="anchor-heading"><a id="zk_authz_quorum" class="anchor-link"></a><a href="#zk_authz_quorum">7.7.4 ZooKeeper Quorum Mutual TLS Authentication</a></h4>
    It is possible to enable mTLS authentication between the ZooKeeper servers themselves.
    Please refer to the <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#Quorum+TLS">ZooKeeper documentation</a> for more detail.

    <h3 class="anchor-heading"><a id="zk_encryption" class="anchor-link"></a><a href="#zk_encryption">7.8 ZooKeeper Encryption</a></h3>
    ZooKeeper connections that use mutual TLS are encrypted.
    Beginning with ZooKeeper version 3.5.7 (the version shipped with Kafka version 2.5) ZooKeeper supports a sever-side config
    <code>ssl.clientAuth</code> (case-insensitively: <code>want</code>/<code>need</code>/<code>none</code> are the valid options, the default is <code>need</code>),
    and setting this value to <code>none</code> in ZooKeeper allows clients to connect via a TLS-encrypted connection
    without presenting their own certificate.  Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with just TLS encryption.
    These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
    <pre><code class="language-text"># connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
# no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
zookeeper.set.acl=true</code></pre>
</script>

<div class="p-security"></div>
